Support multiple output batches for hash aggr.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1726d734 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1726d734 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1726d734 Branch: refs/heads/master Commit: 1726d734a8e7e90cdb12ad092c0b79eb6e4f3cb2 Parents: 3f21451 Author: Aman Sinha <[email protected]> Authored: Thu May 15 16:41:24 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Jun 5 09:36:38 2014 -0700 ---------------------------------------------------------------------- .../physical/impl/aggregate/HashAggBatch.java | 24 +-- .../impl/aggregate/HashAggTemplate.java | 173 +++++++++++++++---- .../physical/impl/aggregate/HashAggregator.java | 5 + .../exec/physical/impl/common/HashTable.java | 2 +- .../physical/impl/common/HashTableTemplate.java | 16 ++ .../apache/drill/exec/record/RecordBatch.java | 4 +- 6 files changed, 174 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index d2800bd..4478938 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -18,18 +18,13 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.compile.sig.GeneratorMapping; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; @@ -39,11 +34,8 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator.BlockType; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorWriteExpression; -import org.apache.drill.exec.expr.holders.IntHolder; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.record.AbstractRecordBatch; @@ -51,17 +43,12 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome; -import org.apache.drill.exec.physical.impl.common.ChainedHashTable; -import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.record.VectorWrapper; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.sun.codemodel.JExpr; import com.sun.codemodel.JVar; @@ -124,12 +111,16 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } } - if (aggregator.allFlushed()) { return IterOutcome.NONE; } - logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); + if (aggregator.buildComplete() && ! aggregator.allFlushed()) { + // aggregation is complete and not all records have been output yet + return aggregator.outputCurrentBatch(); + } + + logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); while(true){ AggOutcome out = aggregator.doWork(); @@ -284,6 +275,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { @Override public void cleanup() { + if (aggregator != null) { + aggregator.cleanup(); + } super.cleanup(); incoming.cleanup(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 039445b..b65acb0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -73,6 +73,7 @@ public abstract class HashAggTemplate implements HashAggregator { private IterOutcome outcome; private int outputCount = 0; private int numGroupedRecords = 0; + private int outBatchIndex = 0; private RecordBatch incoming; private BatchSchema schema; private RecordBatch outgoing; @@ -91,11 +92,13 @@ public abstract class HashAggTemplate implements HashAggregator { private MaterializedField[] materializedValueFields; private boolean allFlushed = false; + private boolean buildComplete = false; public class BatchHolder { private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables) int maxOccupiedIdx = -1; + int batchOutputCount = 0; private BatchHolder() { @@ -120,15 +123,15 @@ public abstract class HashAggTemplate implements HashAggregator { return true; } - private void setup(int idx) { + private void setup() { setupInterior(incoming, outgoing, aggrValuesContainer); } private boolean outputValues() { for (int i = 0; i <= maxOccupiedIdx; i++) { - if (outputRecordValues(i, outputCount) ) { - if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", outputCount) ; - outputCount++; + if (outputRecordValues(i, batchOutputCount) ) { + if (EXTRA_DEBUG_2) logger.debug("Outputting values to batch index: {} output index: {}", batchOutputCount) ; + batchOutputCount++; } else { return false; } @@ -139,7 +142,15 @@ public abstract class HashAggTemplate implements HashAggregator { private void clear() { aggrValuesContainer.clear(); } + + private int getNumGroups() { + return maxOccupiedIdx + 1; + } + private int getOutputCount() { + return batchOutputCount; + } + // Code-generated methods (implemented in HashAggBatch) @RuntimeOverridden @@ -260,16 +271,29 @@ public abstract class HashAggTemplate implements HashAggregator { } case NONE: - outcome = out; - outputKeysAndValues() ; - - // cleanup my internal state since there is nothing more to return - this.cleanup(); + // outcome = out; + + buildComplete = true; + + // outputKeysAndValues() ; + + // output the first batch; remaining batches will be output + // in response to each next() call by a downstream operator + + // outputKeysAndValues(outBatchIndex); + outputCurrentBatch(); + + //if (isLastBatchOutput()) { + // cleanup my internal state since there is nothing more to return + // this.cleanup(); + // } + // cleanup incoming batch since output of aggregation does not need // any references to the incoming incoming.cleanup(); - return setOkAndReturn(); + // return setOkAndReturn(); + return AggOutcome.RETURN_OUTCOME; case STOP: default: @@ -286,24 +310,19 @@ public abstract class HashAggTemplate implements HashAggregator { if(first) first = !first; } } - - private void allocateOutgoing() { - - // At present, since we output all records at once, we create the outgoing batch - // with a size of numGroupedRecords..however this has to be restricted to max of 64K right - // now otherwise downstream operators will break. - // TODO: allow outputting arbitrarily large number of records in batches - assert (numGroupedRecords < Character.MAX_VALUE); + + private void allocateOutgoing(int numOutputRecords) { for (VectorAllocator a : keyAllocators) { - if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords); - a.alloc(numGroupedRecords); + if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numOutputRecords); + a.alloc(numOutputRecords); } for (VectorAllocator a : valueAllocators) { - if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords); - a.alloc(numGroupedRecords); + if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numOutputRecords); + a.alloc(numOutputRecords); } + } @Override @@ -314,20 +333,25 @@ public abstract class HashAggTemplate implements HashAggregator { @Override public int getOutputCount() { return outputCount; + // return batchHolders.get(outBatchIndex).getOutputCount(); } @Override public void cleanup(){ - htable.clear(); - htable = null; + if (htable != null) { + htable.clear(); + htable = null; + } htIdxHolder = null; materializedValueFields = null; - for (BatchHolder bh : batchHolders) { - bh.clear(); + if (batchHolders != null) { + for (BatchHolder bh : batchHolders) { + bh.clear(); + } + batchHolders.clear(); + batchHolders = null; } - batchHolders.clear(); - batchHolders = null; } private AggOutcome tooBigFailure(){ @@ -368,29 +392,114 @@ public abstract class HashAggTemplate implements HashAggregator { if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); - int batchIdx = batchHolders.size() - 1; - bh.setup(batchIdx); + bh.setup(); } + /* private boolean outputKeysAndValues() { allocateOutgoing(); - this.htable.outputKeys(); + int batchIdx = 0; + for (BatchHolder bh : batchHolders) { + if (! this.htable.outputKeys(batchIdx++)) { + return false; + } + } for (BatchHolder bh : batchHolders) { if (! bh.outputValues() ) { return false; } } - + allFlushed = true ; return true; } +*/ + + // output the keys and values for a particular batch holder + private boolean outputKeysAndValues(int batchIdx) { + + allocateOutgoing(batchIdx); + + if (! this.htable.outputKeys(batchIdx)) { + return false; + } + if (! batchHolders.get(batchIdx).outputValues()) { + return false; + } + + outBatchIndex = batchIdx+1; + + if (outBatchIndex == batchHolders.size()) { + allFlushed = true; + } + + return true; + } + + public IterOutcome outputCurrentBatch() { + if (outBatchIndex >= batchHolders.size()) { + this.outcome = IterOutcome.NONE; + return outcome; + } + // get the number of groups in the batch holder corresponding to this batch index + int batchOutputRecords = batchHolders.get(outBatchIndex).getNumGroups(); + + if (batchOutputRecords == 0) { + this.outcome = IterOutcome.NONE; + return outcome; + } + + allocateOutgoing(batchOutputRecords); + + if (this.htable.outputKeys(outBatchIndex) + && batchHolders.get(outBatchIndex).outputValues()) { + + // set the value count for outgoing batch value vectors + for(VectorWrapper<?> v : outgoing) { + v.getValueVector().getMutator().setValueCount(batchOutputRecords); + } + + outputCount += batchOutputRecords; + + if(first){ + this.outcome = IterOutcome.OK_NEW_SCHEMA; + }else{ + this.outcome = IterOutcome.OK; + } + + logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, batchOutputRecords); + + outBatchIndex++; + if (outBatchIndex == batchHolders.size()) { + allFlushed = true; + + logger.debug("HashAggregate: All batches flushed."); + + // cleanup my internal state since there is nothing more to return + this.cleanup(); + } + } else { + this.outcome = IterOutcome.STOP; + } + + return this.outcome; + } + public boolean allFlushed() { return allFlushed; } + + public boolean buildComplete() { + return buildComplete; + } + + public int numGroupedRecords() { + return numGroupedRecords; + } // Check if a group is present in the hash table; if not, insert it in the hash table. // The htIdxHolder contains the index of the group in the hash table container; this same http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 9032f2a..9e6cdb9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -57,4 +57,9 @@ public interface HashAggregator { public abstract void cleanup(); public abstract boolean allFlushed(); + + public abstract boolean buildComplete(); + + public abstract IterOutcome outputCurrentBatch(); + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index e5959f2..46cb47d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -58,7 +58,7 @@ public interface HashTable { public void clear(); - public boolean outputKeys(); + public boolean outputKeys(int batchIdx); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 3a8e609..f2844ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -157,6 +157,12 @@ public abstract class HashTableTemplate implements HashTable { int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; boolean match = false; + if (currentIdxWithinBatch >= HashTable.BATCH_SIZE) { + logger.debug("Batch size = {}, incomingRowIdx = {}, currentIdxWithinBatch = {}.", HashTable.BATCH_SIZE, incomingRowIdx, currentIdxWithinBatch); + } + assert (currentIdxWithinBatch < HashTable.BATCH_SIZE); + assert (incomingRowIdx < HashTable.BATCH_SIZE); + if (isProbe) match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch); else @@ -599,6 +605,7 @@ public abstract class HashTableTemplate implements HashTable { } } + /* public boolean outputKeys() { for (BatchHolder bh : batchHolders) { if ( ! bh.outputKeys()) { @@ -607,7 +614,16 @@ public abstract class HashTableTemplate implements HashTable { } return true; } + */ + public boolean outputKeys(int batchIdx) { + assert batchIdx < batchHolders.size(); + if (! batchHolders.get(batchIdx).outputKeys()) { + return false; + } + return true; + } + private IntVector allocMetadataVector(int size, int initialValue) { IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, allocator); vector.allocateNew(size); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 60fdd4d..662deb6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -33,8 +33,8 @@ import org.apache.drill.exec.vector.ValueVector; */ public interface RecordBatch extends VectorAccessible { - /* max batch size, limited by 2-byte-lentgh in SV2 : 65535 = 2^16 -1 */ - public static final int MAX_BATCH_SIZE = 65535; + /* max batch size, limited by 2-byte-lentgh in SV2 : 65536 = 2^16 */ + public static final int MAX_BATCH_SIZE = 65536; /** * Describes the outcome of a RecordBatch being incremented forward.
