Improve OperatorStats to avoid leaking state.  Fix issue where HashJoinBatch 
throws NPE in stats tracking if we don't have HashTable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/894037ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/894037ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/894037ab

Branch: refs/heads/master
Commit: 894037ab693dea425e88fb3ec3aff73ea5b15eb1
Parents: 2899288
Author: Jacques Nadeau <[email protected]>
Authored: Tue Jun 17 19:41:15 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Tue Jun 17 20:18:37 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ops/OperatorStats.java     | 15 +++++++++------
 .../drill/exec/physical/impl/WriterRecordBatch.java  |  3 ++-
 .../exec/physical/impl/aggregate/HashAggBatch.java   |  4 ++--
 .../physical/impl/aggregate/StreamingAggBatch.java   |  3 ++-
 .../drill/exec/physical/impl/join/HashJoinBatch.java | 13 +++++++------
 .../exec/physical/impl/join/MergeJoinBatch.java      |  4 ++--
 .../drill/exec/record/AbstractRecordBatch.java       | 10 +++++++---
 7 files changed, 31 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index 7d1e9dc..dcb73c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -66,41 +66,44 @@ public class OperatorStats {
     this.schemaCountByInput = new long[inputCount];
   }
 
+  private String assertionError(String msg){
+    return String.format("Failure while %s for operator id %d. Currently have 
states of processing:%s, setup:%s, waiting:%s.", msg, operatorId, inProcessing, 
inSetup, inWait);
+  }
   public void startSetup() {
-    assert !inSetup  : "Failure while starting setup.  Currently in setup.";
+    assert !inSetup  : assertionError("starting setup");
     stopProcessing();
     inSetup = true;
     setupMark = System.nanoTime();
   }
 
   public void stopSetup() {
-    assert inSetup :  "Failure while stopping setup.  Not currently in setup.";
+    assert inSetup :  assertionError("stopping setup");
     startProcessing();
     setupNanos += System.nanoTime() - setupMark;
     inSetup = false;
   }
 
   public void startProcessing() {
-    assert !inProcessing : "Failure while starting processing.  Currently in 
processing.";
+    assert !inProcessing : assertionError("starting processing");
     processingMark = System.nanoTime();
     inProcessing = true;
   }
 
   public void stopProcessing() {
-    assert inProcessing : "Failure while stopping processing.  Not currently 
in processing.";
+    assert inProcessing : assertionError("stopping processing");
     processingNanos += System.nanoTime() - processingMark;
     inProcessing = false;
   }
 
   public void startWait() {
-    assert !inWait : "Failure while starting waiting.  Currently in waiting.";
+    assert !inWait : assertionError("starting waiting");
     stopProcessing();
     inWait = true;
     waitMark = System.nanoTime();
   }
 
   public void stopWait() {
-    assert inWait : "Failure while stopping waiting.  Currently not in 
waiting.";
+    assert inWait : assertionError("stopping waiting");
     startProcessing();
     waitNanos += System.nanoTime() - waitMark;
     inWait = false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 2dae853..43e0dd4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -156,9 +156,10 @@ public class WriterRecordBatch extends 
AbstractRecordBatch<Writer> {
       // update the schema in RecordWriter
       stats.startSetup();
       recordWriter.updateSchema(incoming.getSchema());
-      stats.stopSetup();
     } catch(IOException ex) {
       throw new RuntimeException("Failed to update schema in RecordWriter", 
ex);
+    } finally{
+      stats.stopSetup();
     }
 
     eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(),

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 8250682..dd58562 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -154,14 +154,14 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
     try{
       stats.startSetup();
       this.aggregator = createAggregatorInternal();
-      stats.stopSetup();
       return true;
     }catch(SchemaChangeException | ClassTransformationException | IOException 
ex){
-      stats.stopSetup();
       context.fail(ex);
       container.clear();
       incoming.kill();
       return false;
+    }finally{
+      stats.stopSetup();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 8cad91b..ec12de9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -136,13 +136,14 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
     try{
       stats.startSetup();
       this.aggregator = createAggregatorInternal();
-      stats.stopSetup();
       return true;
     }catch(SchemaChangeException | ClassTransformationException | IOException 
ex){
       context.fail(ex);
       container.clear();
       incoming.kill();
       return false;
+    }finally{
+      stats.stopSetup();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 9343912..c43b99a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -134,8 +134,8 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
     boolean firstOutputBatch = true;
 
     IterOutcome leftUpstream = IterOutcome.NONE;
-    
-    private HashTableStats htStats = new HashTableStats();
+
+    private final HashTableStats htStats = new HashTableStats();
 
     @Override
     public int getRecordCount() {
@@ -171,9 +171,9 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                 // Create the run time generated code needed to probe and 
project
                 hashJoinProbe = setupHashJoinProbe();
             }
-                        
+
             // Store the number of records projected
-            if (hashTable != null 
+            if (hashTable != null
                 || joinType != JoinRelType.INNER) {
 
                 // Allocate the memory for the vectors in the output container
@@ -440,12 +440,13 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
     }
 
     private void updateStats(HashTable htable) {
+      if(htable == null) return;
       htable.getStats(htStats);
       this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, 
htStats.numBuckets);
       this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, 
htStats.numEntries);
-      this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, 
htStats.numResizing);  
+      this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, 
htStats.numResizing);
     }
-    
+
     @Override
     public void killIncoming() {
         this.left.kill();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 84f8354..e32b653 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -166,12 +166,12 @@ public class MergeJoinBatch extends 
AbstractRecordBatch<MergeJoinPOP> {
           stats.startSetup();
           this.worker = generateNewWorker();
           first = true;
-          stats.stopSetup();
         } catch (ClassTransformationException | IOException | 
SchemaChangeException e) {
-          stats.stopSetup();
           context.fail(new SchemaChangeException(e));
           kill();
           return IterOutcome.STOP;
+        } finally {
+          stats.stopSetup();
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 72a7d3b..4c1f82d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -66,9 +66,13 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
   }
 
   public final IterOutcome next(int inputIndex, RecordBatch b){
+    IterOutcome next = null;
     stats.stopProcessing();
-    IterOutcome next = b.next();
-    stats.startProcessing();
+    try{
+      next = b.next();
+    }finally{
+      stats.startProcessing();
+    }
 
     switch(next){
     case OK_NEW_SCHEMA:
@@ -138,7 +142,7 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
     return batch;
 
   }
-  
+
   @Override
   public VectorContainer getOutgoingContainer() {
     throw new UnsupportedOperationException(String.format(" You should not 
call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));

Reply via email to