DRILL-1068: Reduce memory consumption of various operators. Add more memory stats to query profile.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c3eea13c Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c3eea13c Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c3eea13c Branch: refs/heads/master Commit: c3eea13ce4c45e6bcd0b6863571e1561a9fa1931 Parents: 0dfeac8 Author: Jacques Nadeau <[email protected]> Authored: Sat Jun 21 13:01:43 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 25 09:09:39 2014 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/ops/FragmentStats.java | 6 +- .../apache/drill/exec/ops/OperatorContext.java | 2 +- .../apache/drill/exec/ops/OperatorStats.java | 16 +++- .../org/apache/drill/exec/ops/SenderStats.java | 11 +-- .../drill/exec/physical/impl/BaseRootExec.java | 4 +- .../exec/physical/impl/TopN/TopNBatch.java | 6 +- .../impl/aggregate/HashAggTemplate.java | 66 ++++++++-------- .../impl/aggregate/StreamingAggBatch.java | 5 +- .../impl/aggregate/StreamingAggTemplate.java | 14 ++-- .../impl/aggregate/StreamingAggregator.java | 5 +- .../physical/impl/filter/FilterRecordBatch.java | 7 +- .../exec/physical/impl/join/HashJoinBatch.java | 17 ++-- .../impl/join/HashJoinProbeTemplate.java | 82 ++++++++++++-------- .../impl/mergereceiver/MergingRecordBatch.java | 28 +++---- .../PartitionSenderRootExec.java | 4 +- .../partitionsender/PartitionerTemplate.java | 7 +- .../exec/physical/impl/svremover/Copier.java | 2 +- .../impl/svremover/CopierTemplate2.java | 9 +-- .../impl/svremover/CopierTemplate4.java | 7 +- .../impl/svremover/RemovingRecordBatch.java | 17 +--- .../planner/fragment/SimpleParallelizer.java | 3 +- .../drill/exec/record/VectorContainer.java | 21 ++++- .../exec/vector/allocator/VectorAllocator.java | 29 +++---- 23 files changed, 184 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java index 19ac0aa..22872f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.ops; import java.util.List; -import org.apache.drill.exec.proto.BitControl.FragmentStatus; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; import com.codahale.metrics.MetricRegistry; @@ -46,8 +46,8 @@ public class FragmentStats { } } - public OperatorStats getOperatorStats(OpProfileDef profileDef){ - OperatorStats stats = new OperatorStats(profileDef); + public OperatorStats getOperatorStats(OpProfileDef profileDef, BufferAllocator allocator){ + OperatorStats stats = new OperatorStats(profileDef, allocator); if(profileDef.operatorType != -1){ operators.add(stats); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index d62ea2f..2d46733 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -37,7 +37,7 @@ public class OperatorContext implements Closeable { this.popConfig = popConfig; OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig)); - this.stats = context.getStats().getOperatorStats(def); + this.stats = context.getStats().getOperatorStats(def, allocator); } public OperatorContext(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats) throws OutOfMemoryException { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index dcb73c8..bd8d899 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.ops; -import org.apache.commons.collections.Buffer; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.MetricValue; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; import org.apache.drill.exec.proto.UserBitShared.StreamProfile; @@ -30,6 +30,7 @@ public class OperatorStats { protected final int operatorId; protected final int operatorType; + private final BufferAllocator allocator; private IntLongOpenHashMap longMetrics = new IntLongOpenHashMap(); private IntDoubleOpenHashMap doubleMetrics = new IntDoubleOpenHashMap(); @@ -53,12 +54,13 @@ public class OperatorStats { private long schemas; - public OperatorStats(OpProfileDef def){ - this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount()); + public OperatorStats(OpProfileDef def, BufferAllocator allocator){ + this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator); } - private OperatorStats(int operatorId, int operatorType, int inputCount) { + private OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) { super(); + this.allocator = allocator; this.operatorId = operatorId; this.operatorType = operatorType; this.recordsReceivedByInput = new long[inputCount]; @@ -126,6 +128,12 @@ public class OperatorStats { .setProcessNanos(processingNanos) .setWaitNanos(waitNanos); + if(allocator != null){ + b.setLocalMemoryAllocated(allocator.getAllocatedMemory()); + } + + + addAllMetrics(b); return b.build(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java index c766632..5167edb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java @@ -17,21 +17,22 @@ */ package org.apache.drill.exec.ops; +import java.util.List; + +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderStats; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch; import org.apache.drill.exec.proto.UserBitShared; -import java.util.List; - public class SenderStats extends OperatorStats { long minReceiverRecordCount = 0; long maxReceiverRecordCount = 0; int nSenders = 0; - public SenderStats(PhysicalOperator operator) { - super(new OpProfileDef(operator.getOperatorId(), operator.getOperatorType(), OperatorContext.getChildCount(operator))); + public SenderStats(PhysicalOperator operator, BufferAllocator allocator) { + super(new OpProfileDef(operator.getOperatorId(), operator.getOperatorType(), OperatorContext.getChildCount(operator)), allocator); } public void updatePartitionStats(List<? extends PartitionStatsBatch> outgoing) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 452052b..d56da51 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -32,9 +32,9 @@ public abstract class BaseRootExec implements RootExec { protected OperatorContext oContext = null; public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException { - this.stats = new SenderStats(config); - context.getStats().addOperatorStats(this.stats); this.oContext = new OperatorContext(config, context, stats); + this.stats = new SenderStats(config, oContext.getAllocator()); + context.getStats().addOperatorStats(this.stats); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 4a5d368..846d419 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -219,14 +219,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { if (copier == null) { copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch); } else { - List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : batch){ ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator()); newContainer.add(v); - allocators.add(RemovingRecordBatch.getAllocator4(v)); } - copier.setupRemover(context, batch, newBatch, allocators.toArray(new VectorAllocator[allocators.size()])); + copier.setupRemover(context, batch, newBatch); } SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); do { @@ -372,7 +370,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { public Iterator<VectorWrapper<?>> iterator() { return container.iterator(); } - + @Override public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 935bbb3..1afa5ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -148,7 +148,7 @@ public abstract class HashAggTemplate implements HashAggregator { private void clear() { aggrValuesContainer.clear(); } - + private int getNumGroups() { return maxOccupiedIdx + 1; } @@ -156,7 +156,7 @@ public abstract class HashAggTemplate implements HashAggregator { private int getOutputCount() { return batchOutputCount; } - + // Code-generated methods (implemented in HashAggBatch) @RuntimeOverridden @@ -242,7 +242,8 @@ public abstract class HashAggTemplate implements HashAggregator { if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()..."); for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - checkGroupAndAggrValues(currentIndex); + boolean success = checkGroupAndAggrValues(currentIndex); + assert success : "HashAgg couldn't copy values."; } if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex); @@ -273,7 +274,8 @@ public abstract class HashAggTemplate implements HashAggregator { if(incoming.getRecordCount() == 0){ continue; } else { - checkGroupAndAggrValues(currentIndex); + boolean success = checkGroupAndAggrValues(currentIndex); + assert success : "HashAgg couldn't copy values."; incIndex(); if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop"); @@ -282,16 +284,16 @@ public abstract class HashAggTemplate implements HashAggregator { case NONE: // outcome = out; - + buildComplete = true; - + updateStats(htable); - // output the first batch; remaining batches will be output + // output the first batch; remaining batches will be output // in response to each next() call by a downstream operator - + outputCurrentBatch(); - + // cleanup incoming batch since output of aggregation does not need // any references to the incoming @@ -314,7 +316,7 @@ public abstract class HashAggTemplate implements HashAggregator { if(first) first = !first; } } - + private void allocateOutgoing(int numOutputRecords) { for (VectorAllocator a : keyAllocators) { @@ -326,7 +328,7 @@ public abstract class HashAggTemplate implements HashAggregator { if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numOutputRecords); a.alloc(numOutputRecords); } - + } @Override @@ -398,12 +400,12 @@ public abstract class HashAggTemplate implements HashAggregator { bh.setup(); } - + // output the keys and values for a particular batch holder private boolean outputKeysAndValues(int batchIdx) { - + allocateOutgoing(batchIdx); - + if (! this.htable.outputKeys(batchIdx)) { return false; } @@ -412,14 +414,14 @@ public abstract class HashAggTemplate implements HashAggregator { } outBatchIndex = batchIdx+1; - + if (outBatchIndex == batchHolders.size()) { allFlushed = true; } - + return true; } - + public IterOutcome outputCurrentBatch() { if (outBatchIndex >= batchHolders.size()) { this.outcome = IterOutcome.NONE; @@ -433,35 +435,35 @@ public abstract class HashAggTemplate implements HashAggregator { this.outcome = IterOutcome.NONE; return outcome; } - + allocateOutgoing(batchOutputRecords); - + boolean outputKeysStatus = this.htable.outputKeys(outBatchIndex) ; - boolean outputValuesStatus = batchHolders.get(outBatchIndex).outputValues(); + boolean outputValuesStatus = batchHolders.get(outBatchIndex).outputValues(); if (outputKeysStatus && outputValuesStatus) { - + // set the value count for outgoing batch value vectors for(VectorWrapper<?> v : outgoing) { v.getValueVector().getMutator().setValueCount(batchOutputRecords); } - + outputCount += batchOutputRecords; - + if(first){ this.outcome = IterOutcome.OK_NEW_SCHEMA; }else{ this.outcome = IterOutcome.OK; } - + logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, batchOutputRecords); lastBatchOutputCount = batchOutputRecords; outBatchIndex++; if (outBatchIndex == batchHolders.size()) { allFlushed = true; - + logger.debug("HashAggregate: All batches flushed."); - + // cleanup my internal state since there is nothing more to return this.cleanup(); } @@ -470,18 +472,18 @@ public abstract class HashAggTemplate implements HashAggregator { if (!outputValuesStatus) context.fail(new Exception("Failed to output values for current batch !")); this.outcome = IterOutcome.STOP; } - + return this.outcome; } - + public boolean allFlushed() { return allFlushed; } - + public boolean buildComplete() { return buildComplete; } - + public int numGroupedRecords() { return numGroupedRecords; } @@ -545,12 +547,12 @@ public abstract class HashAggTemplate implements HashAggregator { return false; } - + private void updateStats(HashTable htable) { htable.getStats(htStats); this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, htStats.numBuckets); this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, htStats.numEntries); - this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, htStats.numResizing); + this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, htStats.numResizing); } // Code-generated methods (implemented in HashAggBatch) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 367d2c7..b587ad1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -168,7 +168,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry()); container.clear(); - List<VectorAllocator> allocators = Lists.newArrayList(); LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().length]; LogicalExpression[] valueExprs = new LogicalExpression[popConfig.getExprs().length]; @@ -183,7 +182,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { keyExprs[i] = expr; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); - allocators.add(VectorAllocator.getAllocator(vector, 50)); keyOutputIds[i] = container.add(vector); } @@ -194,7 +192,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); - allocators.add(VectorAllocator.getAllocator(vector, 50)); TypedFieldId id = container.add(vector); valueExprs[i] = new ValueVectorWriteExpression(id, expr, true); } @@ -212,7 +209,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { container.buildSchema(SelectionVectorMode.NONE); StreamingAggregator agg = context.getImplementationClass(cg); - agg.setup(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()])); + agg.setup(context, incoming, this); return agg; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 3bd861d..4d6e7c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; public abstract class StreamingAggTemplate implements StreamingAggregator { @@ -44,18 +45,15 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private RecordBatch incoming; private BatchSchema schema; private StreamingAggBatch outgoing; - private VectorAllocator[] allocators; private FragmentContext context; private InternalBatch remainderBatch; @Override - public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException { - this.allocators = allocators; + public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException { this.context = context; this.incoming = incoming; this.schema = incoming.getSchema(); - this.allocators = allocators; this.outgoing = outgoing; setupInterior(incoming, outgoing); this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex); @@ -63,10 +61,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private void allocateOutgoing() { - for (VectorAllocator a : allocators) { - if(EXTRA_DEBUG) logger.debug("Allocating {} with {} records.", a, 20000); - a.alloc(20000); - if(EXTRA_DEBUG) logger.debug("Allocated {}", a); + for(VectorWrapper<?> w : outgoing){ + w.getValueVector().allocateNew(); } } @@ -90,7 +86,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { public AggOutcome doWork() { try{ // outside loop to ensure that first is set to false after the first run. outputCount = 0; - + // if we're in the first state, allocate outgoing. if(first){ allocateOutgoing(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 52f30ae..c624c9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -31,9 +31,8 @@ public interface StreamingAggregator { public static enum AggOutcome { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; } - - public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing, - VectorAllocator[] allocators) throws SchemaChangeException; + + public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; public abstract IterOutcome getOutcome(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index d52d2e3..89a6d09 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -55,7 +55,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); } - + @Override public FragmentContext getContext() { return context; @@ -85,8 +85,8 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ // m.setValueCount(recordCount); // } } - - + + @Override public void cleanup() { if(sv2 != null) sv2.clear(); @@ -128,7 +128,6 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ protected Filterer generateSV4Filterer() throws SchemaChangeException { final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); - final List<VectorAllocator> allocators = Lists.newArrayList(); final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 1c028d0..bd0e23f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -103,9 +103,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // Current batch index on the build side private int buildBatchIndex = 0; - // List of vector allocators - private List<VectorAllocator> allocators = null; - // Schema of the build side private BatchSchema rightSchema = null; @@ -261,8 +258,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } } - HashTableConfig htConfig = - new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(), + HashTableConfig htConfig = + new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(), HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr); // Create the chained hash table @@ -346,7 +343,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { - allocators = new ArrayList<>(); + final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry()); ClassGenerator<HashJoinProbe> g = cg.getRoot(); @@ -374,7 +371,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // Add the vector to our output container ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator()); container.add(v); - allocators.add(RemovingRecordBatch.getAllocator4(v)); JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId)); JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId)); @@ -410,7 +406,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator()); container.add(v); - allocators.add(RemovingRecordBatch.getAllocator4(v)); JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId)); JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId)); @@ -432,9 +427,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } private void allocateVectors(){ - for(VectorAllocator a : allocators){ - a.alloc(RecordBatch.MAX_BATCH_SIZE); - } + for(VectorWrapper<?> v : container){ + v.getValueVector().allocateNew(); + } } public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index a4cc662..21c4ae7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -41,9 +41,11 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { // Join type, INNER, LEFT, RIGHT or OUTER private JoinRelType joinType; - + private HashJoinBatch outgoingJoinBatch = null; + private static final int TARGET_RECORDS_PER_BATCH = 4000; + /* Helper class * Maintains linked list of build side records with the same key * Keeps information about which build records have a corresponding @@ -91,14 +93,21 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { } public void executeProjectRightPhase() { - while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) { - boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++); - assert success; + boolean success = true; + while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { + success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords); + if(success){ + recordsProcessed++; + outputRecords++; + }else{ + if(outputRecords == 0) throw new IllegalStateException("Too big to fail."); + break; + } } } public void executeProbePhase() throws SchemaChangeException { - while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) { + while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsToProcess > 0) { // Check if we have processed all records in this batch we need to invoke next if (recordsProcessed == recordsToProcess) { @@ -157,31 +166,34 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { */ hjHelper.setRecordMatched(currentCompositeIdx); - boolean success = projectBuildRecord(currentCompositeIdx, outputRecords); - assert success; - success = projectProbeRecord(recordsProcessed, outputRecords); - assert success; - outputRecords++; + boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) // + && projectProbeRecord(recordsProcessed, outputRecords); + if(!success){ + // we failed to project. redo this record. + getNextRecord = false; + }else{ + outputRecords++; - /* Projected single row from the build side with matching key but there - * may be more rows with the same key. Check if that's the case - */ - currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx); - if (currentCompositeIdx == -1) { - /* We only had one row in the build side that matched the current key - * from the probe side. Drain the next row in the probe side. + /* Projected single row from the build side with matching key but there + * may be more rows with the same key. Check if that's the case */ - recordsProcessed++; + currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx); + if (currentCompositeIdx == -1) { + /* We only had one row in the build side that matched the current key + * from the probe side. Drain the next row in the probe side. + */ + recordsProcessed++; + } + else { + /* There is more than one row with the same key on the build side + * don't drain more records from the probe side till we have projected + * all the rows with this key + */ + getNextRecord = false; + } } - else { - /* There is more than one row with the same key on the build side - * don't drain more records from the probe side till we have projected - * all the rows with this key - */ - getNextRecord = false; - } - } - else { // No matching key + + } else { // No matching key // If we have a left outer join, project the keys if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { @@ -190,12 +202,18 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { } recordsProcessed++; } - } - else { + } else { hjHelper.setRecordMatched(currentCompositeIdx); - boolean success = projectBuildRecord(currentCompositeIdx, outputRecords); - assert success; - success = projectProbeRecord(recordsProcessed, outputRecords); + boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) // + && projectProbeRecord(recordsProcessed, outputRecords); + if(!success){ + if(outputRecords == 0){ + throw new IllegalStateException("Record larger than single batch."); + }else{ + // we've output some records but failed to output this one. return and wait for next call. + return; + } + } assert success; outputRecords++; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 9351844..a16d64c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -90,14 +90,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private boolean hasRun = false; private boolean prevBatchWasFull = false; private boolean hasMoreIncoming = true; - private final int DEFAULT_ALLOC_RECORD_COUNT = 20000; private int outgoingPosition = 0; private int senderCount = 0; private RawFragmentBatch[] incomingBatches; private int[] batchOffsets; private PriorityQueue <Node> pqueue; - private List<VectorAllocator> allocators; private RawFragmentBatch emptyBatch = null; private boolean done = false; @@ -117,7 +115,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> super(config, context); this.fragProviders = fragProviders; this.context = context; - this.allocators = Lists.newArrayList(); this.outgoingContainer = new VectorContainer(); } @@ -243,14 +240,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // allocate a new value vector ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), oContext.getAllocator()); - VectorAllocator allocator = VectorAllocator.getAllocator(outgoingVector, 50); - allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT); - allocators.add(allocator); + outgoingVector.allocateNew(); outgoingContainer.add(outgoingVector); ++vectorCount; } - logger.debug("Allocating {} outgoing vectors with {} values", vectorCount, DEFAULT_ALLOC_RECORD_COUNT); schema = bldr.build(); if (schema != null && !schema.equals(schema)) { @@ -295,11 +289,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } pqueue.poll(); - if (isOutgoingFull()) { - // set a flag so that we reallocate on the next iteration - logger.debug("Outgoing vectors record batch size reached; breaking"); - prevBatchWasFull = true; - } +// if (isOutgoingFull()) { +// // set a flag so that we reallocate on the next iteration +// logger.debug("Outgoing vectors record batch size reached; breaking"); +// prevBatchWasFull = true; +// } if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) { // reached the end of an incoming record batch @@ -443,14 +437,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } private void allocateOutgoing() { - for (VectorAllocator allocator : allocators) { - allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT); - } + outgoingContainer.allocateNew(); } - private boolean isOutgoingFull() { - return outgoingPosition == DEFAULT_ALLOC_RECORD_COUNT; - } +// private boolean isOutgoingFull() { +// return outgoingPosition == DEFAULT_ALLOC_RECORD_COUNT; +// } /** * Creates a generate class which implements the copy and compare methods. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index c4844d5..5fcfdcc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -139,7 +139,7 @@ public class PartitionSenderRootExec extends BaseRootExec { return false; } stats.updatePartitionStats(partitioner.getOutgoingBatches()); - for (VectorWrapper v : incoming) { + for (VectorWrapper<?> v : incoming) { v.clear(); } return true; @@ -156,8 +156,6 @@ public class PartitionSenderRootExec extends BaseRootExec { final ErrorCollector collector = new ErrorCollectorImpl(); final ClassGenerator<Partitioner> cg ; - boolean hyper = false; - cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry()); ClassGenerator<Partitioner> cgInner = cg.getInnerGenerator("OutgoingRecordBatch"); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 6a26d30..0d967b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -212,8 +212,7 @@ public abstract class PartitionerTemplate implements Partitioner { private int recordCount; private int totalRecords; private OperatorStats stats; - private static final int DEFAULT_RECORD_BATCH_SIZE = 20000; - private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200; + private static final int DEFAULT_RECORD_BATCH_SIZE = 1000; private final StatusHandler statusHandler; @@ -301,7 +300,7 @@ public abstract class PartitionerTemplate implements Partitioner { recordCount = 0; vectorContainer.zeroVectors(); for (VectorWrapper<?> v : vectorContainer) { - VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE); + v.getValueVector().allocateNew(); } if (!statusHandler.isOk()) { throw new IOException(statusHandler.getException()); @@ -320,7 +319,7 @@ public abstract class PartitionerTemplate implements Partitioner { // allocate a new value vector ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator); - VectorAllocator.getAllocator(outgoingVector, DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE); + outgoingVector.allocateNew(); vectorContainer.add(outgoingVector); } outSchema = bldr.build(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java index 0aab7b2..dfc37c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java @@ -27,7 +27,7 @@ public interface Copier { public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate2.class); public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class); - public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException; + public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException; public abstract int copyRecords(int index, int recordCount); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java index 387497c..5cc308a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java @@ -20,26 +20,21 @@ package org.apache.drill.exec.physical.impl.svremover; import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; public abstract class CopierTemplate2 implements Copier{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class); private SelectionVector2 sv2; - private RecordBatch incoming; private RecordBatch outgoing; @Override - public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{ + public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException{ this.sv2 = incoming.getSelectionVector2(); - this.incoming = incoming; this.outgoing = outgoing; doSetup(context, incoming, outgoing); } @@ -47,7 +42,7 @@ public abstract class CopierTemplate2 implements Copier{ @Override public int copyRecords(int index, int recordCount){ for(VectorWrapper<?> out : outgoing){ - out.getValueVector().allocateNewSafe(); + out.getValueVector().allocateNew(); } int outgoingPosition = 0; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java index b48a8fd..7a1c029 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java @@ -31,13 +31,11 @@ public abstract class CopierTemplate4 implements Copier{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class); private SelectionVector4 sv4; - private RecordBatch incoming; private RecordBatch outgoing; @Override - public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{ - this.incoming = incoming; + public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException{ this.outgoing = outgoing; this.sv4 = incoming.getSelectionVector4(); doSetup(context, incoming, outgoing); @@ -46,9 +44,8 @@ public abstract class CopierTemplate4 implements Copier{ @Override public int copyRecords(int index, int recordCount){ -// logger.debug("Copying records."); for(VectorWrapper<?> out : outgoing){ - out.getValueVector().allocateNewSafe(); + out.getValueVector().allocateNew(); } int outgoingPosition = 0; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 558ce63..42f2128 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -195,7 +195,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private List<ValueVector> out = Lists.newArrayList(); @Override - public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){ + public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing){ for(VectorWrapper<?> vv : incoming){ TransferPair tp = vv.getValueVector().getTransferPair(); pairs.add(tp); @@ -220,7 +220,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private Copier getStraightCopier(){ StraightCopier copier = new StraightCopier(); - copier.setupRemover(context, incoming, this, null); + copier.setupRemover(context, incoming, this); container.addCollection(copier.getOut()); return copier; } @@ -237,7 +237,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry()); CopyUtil.generateCopies(cg.getRoot(), incoming, false); Copier copier = context.getImplementationClass(cg); - copier.setupRemover(context, incoming, this, null); + copier.setupRemover(context, incoming, this); return copier; } catch (ClassTransformationException | IOException e) { @@ -262,7 +262,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry()); CopyUtil.generateCopies(cg.getRoot(), batch, true); Copier copier = context.getImplementationClass(cg); - copier.setupRemover(context, batch, outgoing, null); + copier.setupRemover(context, batch, outgoing); return copier; } catch (ClassTransformationException | IOException e) { @@ -275,15 +275,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect return WritableBatch.get(this); } - public static VectorAllocator getAllocator4(ValueVector outgoing){ - if(outgoing instanceof FixedWidthVector){ - return new FixedVectorAllocator((FixedWidthVector) outgoing); - }else if(outgoing instanceof VariableWidthVector ){ - return new VariableEstimatedVector( (VariableWidthVector) outgoing, 250); - }else{ - throw new UnsupportedOperationException(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 0ce480d..cea5460 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -60,7 +60,8 @@ public class SimpleParallelizer { private double affinityFactor; public SimpleParallelizer(QueryContext context){ - this.parallelizationThreshold = context.getOptions().getOption(ExecConstants.SLICE_TARGET).num_val; + long sliceTarget = context.getOptions().getOption(ExecConstants.SLICE_TARGET).num_val; + this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1; this.maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val.intValue(); this.maxGlobalWidth = context.getOptions().getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue(); this.affinityFactor = context.getOptions().getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 1f2c33a..49c7399 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -42,7 +42,7 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto private BatchSchema schema; private int recordCount = -1; private final OperatorContext oContext; - + public VectorContainer() { this.oContext = null; } @@ -74,12 +74,12 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto add(vv, releasable); } - public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){ + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){ MaterializedField field = MaterializedField.create(name, type); ValueVector v = TypeHelper.getNewVector(field, this.oContext.getAllocator()); - + add(v); - + if(clazz.isAssignableFrom(v.getClass())){ return (T) v; }else{ @@ -252,5 +252,18 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto return this.wrappers.size(); } + public void allocateNew(){ + for (VectorWrapper<?> w : wrappers) { + w.getValueVector().allocateNew(); + } + } + + public boolean allocateNewSafe(){ + for (VectorWrapper<?> w : wrappers) { + if(!w.getValueVector().allocateNewSafe()) return false; + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java index 77b6e1c..eb01bef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java @@ -22,22 +22,23 @@ import org.apache.drill.exec.vector.RepeatedVariableWidthVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; +@Deprecated public abstract class VectorAllocator{ public abstract void alloc(int recordCount); - - public static VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){ - if(outgoing instanceof FixedWidthVector){ - return new FixedVectorAllocator((FixedWidthVector) outgoing); - }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){ - return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing); - } else if (outgoing instanceof RepeatedVariableWidthVector && in instanceof RepeatedVariableWidthVector) { - return new RepeatedVectorAllocator((RepeatedVariableWidthVector) in, (RepeatedVariableWidthVector) outgoing); - }else{ - throw new UnsupportedOperationException(); - } - } - - + +// public static VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){ +// if(outgoing instanceof FixedWidthVector){ +// return new FixedVectorAllocator((FixedWidthVector) outgoing); +// }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){ +// return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing); +// } else if (outgoing instanceof RepeatedVariableWidthVector && in instanceof RepeatedVariableWidthVector) { +// return new RepeatedVectorAllocator((RepeatedVariableWidthVector) in, (RepeatedVariableWidthVector) outgoing); +// }else{ +// throw new UnsupportedOperationException(); +// } +// } + + @Deprecated public static VectorAllocator getAllocator(ValueVector outgoing, int averageBytesPerVariable){ if(outgoing instanceof FixedWidthVector){ return new FixedVectorAllocator((FixedWidthVector) outgoing);
