More stats fixes: Ensure we are calling the stats enabled next() method in the template classes for HashJoin, HashAggregate and StreamingAggregate.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/57a86d4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/57a86d4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/57a86d4a Branch: refs/heads/master Commit: 57a86d4a2c434d997e610016f2fa266108799c93 Parents: fc1a777 Author: Aman Sinha <[email protected]> Authored: Fri Jun 13 18:23:53 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jun 16 08:04:44 2014 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/aggregate/HashAggTemplate.java | 6 +++--- .../exec/physical/impl/aggregate/HashAggregator.java | 2 +- .../physical/impl/aggregate/StreamingAggTemplate.java | 6 +++--- .../physical/impl/aggregate/StreamingAggregator.java | 2 +- .../drill/exec/physical/impl/join/HashJoinBatch.java | 13 +++++-------- .../drill/exec/physical/impl/join/HashJoinHelper.java | 3 +++ .../drill/exec/physical/impl/join/HashJoinProbe.java | 2 +- .../exec/physical/impl/join/HashJoinProbeTemplate.java | 8 ++++++-- 8 files changed, 23 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 2fb3f02..f73d46c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -77,7 +77,7 @@ public abstract class HashAggTemplate implements HashAggregator { private int lastBatchOutputCount = 0; private RecordBatch incoming; private BatchSchema schema; - private RecordBatch outgoing; + private HashAggBatch outgoing; private VectorAllocator[] keyAllocators; private VectorAllocator[] valueAllocators; private FragmentContext context; @@ -166,7 +166,7 @@ public abstract class HashAggTemplate implements HashAggregator { @Override - public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing, + public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, @@ -245,7 +245,7 @@ public abstract class HashAggTemplate implements HashAggregator { for (VectorWrapper<?> v : incoming) { v.getValueVector().clear(); } - IterOutcome out = incoming.next(); + IterOutcome out = outgoing.next(0, incoming); if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out); switch(out){ case NOT_YET: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 9e6cdb9..641d377 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -42,7 +42,7 @@ public interface HashAggregator { } public abstract void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, - RecordBatch outgoing, LogicalExpression[] valueExprs, + HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 65d67dc..e73f21b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -43,14 +43,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private int outputCount = 0; private RecordBatch incoming; private BatchSchema schema; - private RecordBatch outgoing; + private StreamingAggBatch outgoing; private VectorAllocator[] allocators; private FragmentContext context; private InternalBatch remainderBatch; @Override - public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException { + public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException { this.allocators = allocators; this.context = context; this.incoming = incoming; @@ -159,7 +159,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { try{ while(true){ previous = new InternalBatch(incoming); - IterOutcome out = incoming.next(); + IterOutcome out = outgoing.next(0, incoming); if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out); switch(out){ case NONE: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 7c10e9d..52f30ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -32,7 +32,7 @@ public interface StreamingAggregator { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; } - public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, + public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException; public abstract IterOutcome getOutcome(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 684965d..ddc31ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -63,9 +63,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; - private static final int LEFT_INPUT = 0; - private static final int RIGHT_INPUT = 1; - // Probe side record batch private final RecordBatch left; @@ -159,7 +156,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { * as well, for the materialization to be successful. This batch will not be used * till we complete the build phase. */ - leftUpstream = next(LEFT_INPUT, left); + leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); // Build the hash table, using the build side record batches. executeBuildPhase(); @@ -207,12 +204,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { for (VectorWrapper<?> wrapper : left) { wrapper.getValueVector().clear(); } - leftUpstream = next(LEFT_INPUT, left); + leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { for (VectorWrapper<?> wrapper : left) { wrapper.getValueVector().clear(); } - leftUpstream = next(LEFT_INPUT, left); + leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); } } } @@ -263,7 +260,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { //Setup the underlying hash table - IterOutcome rightUpstream = next(RIGHT_INPUT, right); + IterOutcome rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right); boolean moreData = true; @@ -330,7 +327,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { break; } // Get the next record batch - rightUpstream = next(RIGHT_INPUT, right); + rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java index a634827..aec0f31 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java @@ -69,6 +69,9 @@ public class HashJoinHelper { // bits to shift while obtaining batch index from SV4 static final int SHIFT_SIZE = 16; + public static final int LEFT_INPUT = 0; + public static final int RIGHT_INPUT = 1; + public HashJoinHelper(FragmentContext context, BufferAllocator allocator) { this.context = context; this.allocator = allocator; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java index 6d20f60..ae70339 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -48,7 +48,7 @@ public interface HashJoinProbe { } public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, - int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, + int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, JoinRelType joinRelType); public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 2dec9ff..a4cc662 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.AbstractRecordBatch; import org.eigenbase.rel.JoinRelType; @@ -40,6 +41,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { // Join type, INNER, LEFT, RIGHT or OUTER private JoinRelType joinType; + + private HashJoinBatch outgoingJoinBatch = null; /* Helper class * Maintains linked list of build side records with the same key @@ -74,7 +77,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { @Override public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, - int probeRecordCount, RecordBatch outgoing, HashTable hashTable, + int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, JoinRelType joinRelType) { this.probeBatch = probeBatch; @@ -82,6 +85,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { this.recordsToProcess = probeRecordCount; this.hashTable = hashTable; this.hjHelper = hjHelper; + this.outgoingJoinBatch = outgoing; doSetup(context, buildBatch, probeBatch, outgoing); } @@ -104,7 +108,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { wrapper.getValueVector().clear(); } - IterOutcome leftUpstream = probeBatch.next(); + IterOutcome leftUpstream = outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch); switch (leftUpstream) { case NONE:
