Repository: incubator-drill
Updated Branches:
  refs/heads/master 4e817d115 -> 84d23350c


DRILL-443: Fix potential memory leaks inside HashAggregation and HashTable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/84d23350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/84d23350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/84d23350

Branch: refs/heads/master
Commit: 84d23350ca278d84dcff48a47537db63707abad5
Parents: 4e817d1
Author: Aman Sinha <[email protected]>
Authored: Fri Mar 28 16:12:28 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sat Mar 29 15:15:40 2014 -0700

----------------------------------------------------------------------
 .../physical/impl/aggregate/HashAggBatch.java   |  8 +++-
 .../impl/aggregate/HashAggTemplate.java         | 47 ++++++++++++++------
 .../physical/impl/common/HashTableTemplate.java | 13 +++---
 .../exec/physical/impl/agg/TestHashAggr.java    |  8 ++--
 4 files changed, 50 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index d720390..9add544 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -121,17 +121,21 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
       }
     }
 
+
     if (aggregator.allFlushed()) {
       return IterOutcome.NONE;
     }
 
+    logger.debug("Starting aggregator doWork; incoming record count = {} ", 
incoming.getRecordCount());
+    
     while(true){
       AggOutcome out = aggregator.doWork();
       logger.debug("Aggregator response {}, records {}", out, 
aggregator.getOutputCount());
       switch(out){
       case CLEANUP_AND_RETURN:
-        container.zeroVectors();
-        aggregator.cleanup(); 
+        container.clear();
+        aggregator.cleanup();
+        incoming.cleanup();
         done = true;
         return aggregator.getOutcome();
       case RETURN_OUTCOME:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 3109124..21c0c7d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import 
org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
@@ -58,7 +59,8 @@ import com.google.common.collect.Lists;
 public abstract class HashAggTemplate implements HashAggregator {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
   
-  private static final boolean EXTRA_DEBUG = false;
+  private static final boolean EXTRA_DEBUG_1 = false;
+  private static final boolean EXTRA_DEBUG_2 = false; 
   private static final String TOO_BIG_ERROR = "Couldn't add value to an empty 
batch.  This likely means that a single value is too long for a varlen field.";
   private boolean first = true;
   private boolean newSchema = false;
@@ -120,7 +122,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     private boolean outputValues() { 
       for (int i = 0; i <= maxOccupiedIdx; i++) { 
         if (outputRecordValues(i, outputCount) ) {
-          if (EXTRA_DEBUG) logger.debug("Outputting values to {}", 
outputCount) ;
+          if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", 
outputCount) ;
           outputCount++;
         } else {
           return false;
@@ -129,6 +131,10 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       return true;
     }
 
+    private void clear() {
+      aggrValuesContainer.clear();
+    }
+    
     // Code-generated methods (implemented in HashAggBatch)
 
     @RuntimeOverridden
@@ -199,25 +205,28 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
       outside: while(true) {
         // loop through existing records, aggregating the values as necessary.
+        if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
         for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, 
current {}", underlyingIndex, currentIndex);
+          if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying 
{}, current {}", underlyingIndex, currentIndex);
           checkGroupAndAggrValues(currentIndex); 
         }
 
+        if (EXTRA_DEBUG_1) logger.debug("Processed {} records", 
underlyingIndex);
+        
         try{
 
           while(true){
             IterOutcome out = incoming.next();
-            if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
+            if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out);
             switch(out){
             case NOT_YET:
               this.outcome = out;
               return AggOutcome.RETURN_OUTCOME;
               
             case OK_NEW_SCHEMA:
-              if(EXTRA_DEBUG) logger.debug("Received new schema.  Batch has {} 
records.", incoming.getRecordCount());
-              newSchema = true;
-              
+              if(EXTRA_DEBUG_1) logger.debug("Received new schema.  Batch has 
{} records.", incoming.getRecordCount());
+              newSchema = true;              
+              this.cleanup();
               // TODO: new schema case needs to be handled appropriately
               return AggOutcome.UPDATE_AGGREGATOR;
 
@@ -229,14 +238,20 @@ public abstract class HashAggTemplate implements 
HashAggregator {
                 checkGroupAndAggrValues(currentIndex);
                 incIndex();
                 
-                if(EXTRA_DEBUG) logger.debug("Continuing outside loop");
+                if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop");
                 continue outside;
               }
 
             case NONE:
               outcome = out;
               outputKeysAndValues() ; 
-
+              
+              // cleanup my internal state since there is nothing more to 
return
+              this.cleanup();
+              // cleanup incoming batch since output of aggregation does not 
need
+              // any references to the incoming
+              
+              incoming.cleanup();
               return setOkAndReturn();
 
             case STOP:
@@ -257,12 +272,12 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
   private void allocateOutgoing() {
     for (VectorAllocator a : keyAllocators) {
-      if(EXTRA_DEBUG) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numGroupedRecords);
+      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numGroupedRecords);
       a.alloc(numGroupedRecords);
     }
 
     for (VectorAllocator a : valueAllocators) {
-      if(EXTRA_DEBUG) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numGroupedRecords);
+      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numGroupedRecords);
       a.alloc(numGroupedRecords);
     }
   }
@@ -283,6 +298,10 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     htable = null;
     htIdxHolder = null; 
     materializedValueFields = null;
+
+    for (BatchHolder bh : batchHolders) {
+      bh.clear();
+    }
     batchHolders.clear();
     batchHolders = null; 
   }
@@ -323,7 +342,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     BatchHolder bh = new BatchHolder(); 
     batchHolders.add(bh);
 
-    if (EXTRA_DEBUG) logger.debug("HashAggregate: Added new batch; num batches 
= {}.", batchHolders.size());
+    if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num 
batches = {}.", batchHolders.size());
 
     int batchIdx = batchHolders.size() - 1;
     bh.setup(batchIdx); 
@@ -382,7 +401,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
 
       if (putStatus == HashTable.PutStatus.KEY_PRESENT) {
-        if (EXTRA_DEBUG) logger.debug("Group-by key already present in hash 
table, updating the aggregate values");
+        if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash 
table, updating the aggregate values");
 
         // debugging
         //if (holder.value == 100018 || holder.value == 100021) {
@@ -391,7 +410,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
       } 
       else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
-        if (EXTRA_DEBUG) logger.debug("Group-by key was added to hash table, 
inserting new aggregate values") ;
+        if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, 
inserting new aggregate values") ;
 
         // debugging
         // if (holder.value == 100018 || holder.value == 100021) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index a4400f0..5a7a6fa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -239,6 +239,9 @@ public abstract class HashTableTemplate implements 
HashTable {
 
       }      
 
+      links.clear();
+      hashValues.clear();
+      
       links = newLinks;
       hashValues = newHashValues;
     }
@@ -287,11 +290,9 @@ public abstract class HashTableTemplate implements 
HashTable {
     }
     
     private void clear() {
-      htContainer = null;
+      htContainer.clear();;
       links.clear();
-      links = null;
       hashValues.clear();
-      hashValues = null;
     }
 
     // These methods will be code-generated 
@@ -377,8 +378,7 @@ public abstract class HashTableTemplate implements 
HashTable {
     batchHolders.clear();
     batchHolders = null;
     startIndices.clear();
-    startIndices = null;
-    currentIdxHolder = null; 
+    currentIdxHolder = null;
     numEntries = 0;
   }
 
@@ -572,7 +572,8 @@ public abstract class HashTableTemplate implements 
HashTable {
       int batchStartIdx = i * BATCH_SIZE;
       bh.rehash(tableSize, newStartIndices, batchStartIdx);  
     }    
-   
+    
+    startIndices.clear();
     startIndices = newStartIndices;
 
     if (EXTRA_DEBUG) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java
index 8401d7e..1ab7248 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java
@@ -22,10 +22,9 @@ import org.apache.drill.BaseTestQuery;
 import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore // DRILL-443
+
 public class TestHashAggr extends BaseTestQuery{
 
-  
   @Test
   public void testQ6() throws Exception{
     testPhysicalFromFile("agg/hashagg/q6.json");
@@ -41,15 +40,16 @@ public class TestHashAggr extends BaseTestQuery{
     testPhysicalFromFile("agg/hashagg/q7_2.json");
   }
   
-  
+  @Ignore // ignore temporarily since this shows memory leak in 
ParquetRecordReader (DRILL-443)
   @Test
   public void testQ8_1() throws Exception{
     testPhysicalFromFile("agg/hashagg/q8_1.json");
   }
   
+  @Ignore // ignore temporarily since this shows memory leak in 
ParquetRecordReader (DRILL-443)
   @Test
   public void test8() throws Exception{
     testPhysicalFromFile("agg/hashagg/q8.json");
   }
-  
+
 }

Reply via email to