More stats fixes: Ensure we are calling the stats enabled next() method in the 
template classes for HashJoin, HashAggregate and StreamingAggregate.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/57a86d4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/57a86d4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/57a86d4a

Branch: refs/heads/master
Commit: 57a86d4a2c434d997e610016f2fa266108799c93
Parents: fc1a777
Author: Aman Sinha <[email protected]>
Authored: Fri Jun 13 18:23:53 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Mon Jun 16 08:04:44 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/aggregate/HashAggTemplate.java  |  6 +++---
 .../exec/physical/impl/aggregate/HashAggregator.java   |  2 +-
 .../physical/impl/aggregate/StreamingAggTemplate.java  |  6 +++---
 .../physical/impl/aggregate/StreamingAggregator.java   |  2 +-
 .../drill/exec/physical/impl/join/HashJoinBatch.java   | 13 +++++--------
 .../drill/exec/physical/impl/join/HashJoinHelper.java  |  3 +++
 .../drill/exec/physical/impl/join/HashJoinProbe.java   |  2 +-
 .../exec/physical/impl/join/HashJoinProbeTemplate.java |  8 ++++++--
 8 files changed, 23 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 2fb3f02..f73d46c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -77,7 +77,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private int lastBatchOutputCount = 0;
   private RecordBatch incoming;
   private BatchSchema schema;
-  private RecordBatch outgoing;
+  private HashAggBatch outgoing;
   private VectorAllocator[] keyAllocators;
   private VectorAllocator[] valueAllocators;
   private FragmentContext context;
@@ -166,7 +166,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
 
   @Override
-  public void setup(HashAggregate hashAggrConfig, FragmentContext context, 
BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing,
+  public void setup(HashAggregate hashAggrConfig, FragmentContext context, 
BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
                     LogicalExpression[] valueExprs,
                     List<TypedFieldId> valueFieldIds,
                     TypedFieldId[] groupByOutFieldIds,
@@ -245,7 +245,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
             for (VectorWrapper<?> v : incoming) {
               v.getValueVector().clear();
             }
-            IterOutcome out = incoming.next();
+            IterOutcome out = outgoing.next(0, incoming);
             if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out);
             switch(out){
             case NOT_YET:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 9e6cdb9..641d377 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -42,7 +42,7 @@ public interface HashAggregator {
          }
   
   public abstract void setup(HashAggregate hashAggrConfig, FragmentContext 
context, BufferAllocator allocator, RecordBatch incoming,
-                             RecordBatch outgoing, LogicalExpression[] 
valueExprs, 
+                             HashAggBatch outgoing, LogicalExpression[] 
valueExprs, 
                              List<TypedFieldId> valueFieldIds,
                              TypedFieldId[] keyFieldIds,
                              VectorAllocator[] keyAllocators, 
VectorAllocator[] valueAllocators)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 65d67dc..e73f21b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -43,14 +43,14 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   private int outputCount = 0;
   private RecordBatch incoming;
   private BatchSchema schema;
-  private RecordBatch outgoing;
+  private StreamingAggBatch outgoing;
   private VectorAllocator[] allocators;
   private FragmentContext context;
   private InternalBatch remainderBatch;
 
 
   @Override
-  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch 
outgoing, VectorAllocator[] allocators) throws SchemaChangeException {
+  public void setup(FragmentContext context, RecordBatch incoming, 
StreamingAggBatch outgoing, VectorAllocator[] allocators) throws 
SchemaChangeException {
     this.allocators = allocators;
     this.context = context;
     this.incoming = incoming;
@@ -159,7 +159,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
         try{
           while(true){
             previous = new InternalBatch(incoming);
-            IterOutcome out = incoming.next();
+            IterOutcome out = outgoing.next(0, incoming);
             if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
             switch(out){
             case NONE:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 7c10e9d..52f30ae 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -32,7 +32,7 @@ public interface StreamingAggregator {
            RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
          }
   
-  public abstract void setup(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing,
+  public abstract void setup(FragmentContext context, RecordBatch incoming, 
StreamingAggBatch outgoing,
       VectorAllocator[] allocators) throws SchemaChangeException;
 
   public abstract IterOutcome getOutcome();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 684965d..ddc31ee 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -63,9 +63,6 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
-    private static final int LEFT_INPUT = 0;
-    private static final int RIGHT_INPUT = 1;
-
     // Probe side record batch
     private final RecordBatch left;
 
@@ -159,7 +156,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                  * as well, for the materialization to be successful. This 
batch will not be used
                  * till we complete the build phase.
                  */
-                leftUpstream = next(LEFT_INPUT, left);
+                leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
 
                 // Build the hash table, using the build side record batches.
                 executeBuildPhase();
@@ -207,12 +204,12 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                     for (VectorWrapper<?> wrapper : left) {
                       wrapper.getValueVector().clear();
                     }
-                    leftUpstream = next(LEFT_INPUT, left);
+                    leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
                     while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || 
leftUpstream == IterOutcome.OK) {
                       for (VectorWrapper<?> wrapper : left) {
                         wrapper.getValueVector().clear();
                       }
-                      leftUpstream = next(LEFT_INPUT, left);
+                      leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
                     }
                 }
             }
@@ -263,7 +260,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
     public void executeBuildPhase() throws SchemaChangeException, 
ClassTransformationException, IOException {
 
         //Setup the underlying hash table
-        IterOutcome rightUpstream = next(RIGHT_INPUT, right);
+        IterOutcome rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
 
         boolean moreData = true;
 
@@ -330,7 +327,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                     break;
             }
             // Get the next record batch
-            rightUpstream = next(RIGHT_INPUT, right);
+            rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index a634827..aec0f31 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -69,6 +69,9 @@ public class HashJoinHelper {
   // bits to shift while obtaining batch index from SV4
   static final int SHIFT_SIZE = 16;
 
+  public static final int LEFT_INPUT = 0;
+  public static final int RIGHT_INPUT = 1;
+  
   public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
     this.context = context;
     this.allocator = allocator;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 6d20f60..ae70339 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -48,7 +48,7 @@ public interface HashJoinProbe {
   }
 
   public abstract void setupHashJoinProbe(FragmentContext context, 
VectorContainer buildBatch, RecordBatch probeBatch,
-                                          int probeRecordCount, RecordBatch 
outgoing, HashTable hashTable, HashJoinHelper hjHelper,
+                                          int probeRecordCount, HashJoinBatch 
outgoing, HashTable hashTable, HashJoinHelper hjHelper,
                                           JoinRelType joinRelType);
   public abstract void doSetup(FragmentContext context, VectorContainer 
buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
   public abstract int  probeAndProject() throws SchemaChangeException, 
ClassTransformationException, IOException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/57a86d4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 2dec9ff..a4cc662 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.AbstractRecordBatch;
 
 import org.eigenbase.rel.JoinRelType;
 
@@ -40,6 +41,8 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private JoinRelType joinType;
+  
+  private HashJoinBatch outgoingJoinBatch = null;
 
   /* Helper class
    * Maintains linked list of build side records with the same key
@@ -74,7 +77,7 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
 
   @Override
   public void setupHashJoinProbe(FragmentContext context, VectorContainer 
buildBatch, RecordBatch probeBatch,
-                                 int probeRecordCount, RecordBatch outgoing, 
HashTable hashTable,
+                                 int probeRecordCount, HashJoinBatch outgoing, 
HashTable hashTable,
                                  HashJoinHelper hjHelper, JoinRelType 
joinRelType) {
 
     this.probeBatch = probeBatch;
@@ -82,6 +85,7 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
     this.recordsToProcess = probeRecordCount;
     this.hashTable = hashTable;
     this.hjHelper = hjHelper;
+    this.outgoingJoinBatch = outgoing;
 
     doSetup(context, buildBatch, probeBatch, outgoing);
   }
@@ -104,7 +108,7 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
           wrapper.getValueVector().clear();
         }
 
-        IterOutcome leftUpstream = probeBatch.next();
+        IterOutcome leftUpstream = 
outgoingJoinBatch.next(HashJoinHelper.LEFT_INPUT, probeBatch);
 
         switch (leftUpstream) {
           case NONE:

Reply via email to