Fix issue introduced by DRILL-1202 where allocators are being closed after 
reporting success.
Update ScreenRoot to cleanup before returning success.
Update ScanBatch to cleanup reader in case of limit query to avoid memory leak 
in ParquetReader.
Update allocators so that we don't have memory leak when using debug options.
Update project record batch so that it doesn't try to return a released 
remainder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/15eeb9d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/15eeb9d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/15eeb9d3

Branch: refs/heads/master
Commit: 15eeb9d38af517bc294d6c37c3f7dae82a33665f
Parents: 4216e0e
Author: Jacques Nadeau <[email protected]>
Authored: Sun Aug 24 10:04:47 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sun Aug 24 10:42:50 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/memory/TopLevelAllocator.java    |  9 +++--
 .../apache/drill/exec/ops/FragmentContext.java  |  2 +-
 .../drill/exec/physical/impl/ScanBatch.java     |  4 +++
 .../drill/exec/physical/impl/ScreenCreator.java | 21 ++++++++----
 .../impl/project/ProjectRecordBatch.java        |  8 +++++
 .../exec/work/fragment/FragmentExecutor.java    | 36 +++++++++++++++-----
 .../org/apache/drill/TestExampleQueries.java    | 13 +++++++
 7 files changed, 73 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 098922c..d89c892 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -98,7 +98,7 @@ public class TopLevelAllocator implements BufferAllocator {
       throw new OutOfMemoryException(String.format("You attempted to create a 
new child allocator with initial reservation %d but only %d bytes of memory 
were available.", initialReservation, acct.getCapacity() - 
acct.getAllocation()));
     };
     logger.debug("New child allocator with initial reservation {}", 
initialReservation);
-    ChildAllocator allocator = new ChildAllocator(handle, acct, 
maximumReservation, initialReservation);
+    ChildAllocator allocator = new ChildAllocator(handle, acct, 
maximumReservation, initialReservation, childrenMap);
     if(ENABLE_ACCOUNTING) childrenMap.put(allocator, 
Thread.currentThread().getStackTrace());
 
     return allocator;
@@ -130,11 +130,13 @@ public class TopLevelAllocator implements BufferAllocator 
{
     private Map<ChildAllocator, StackTraceElement[]> children = new 
HashMap<>();
     private boolean closed = false;
     private FragmentHandle handle;
+    private Map<ChildAllocator, StackTraceElement[]> thisMap;
 
-    public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, 
long max, long pre) throws OutOfMemoryException{
+    public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, 
long max, long pre, Map<ChildAllocator, StackTraceElement[]> map) throws 
OutOfMemoryException{
       assert max >= pre;
       childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, 
pre);
       this.handle = handle;
+      thisMap = map;
     }
 
     @Override
@@ -171,7 +173,7 @@ public class TopLevelAllocator implements BufferAllocator {
         throw new OutOfMemoryException(String.format("You attempted to create 
a new child allocator with initial reservation %d but only %d bytes of memory 
were available.", initialReservation, childAcct.getAvailable()));
       };
       logger.debug("New child allocator with initial reservation {}", 
initialReservation);
-      ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, 
maximumReservation, initialReservation);
+      ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, 
maximumReservation, initialReservation, null);
       this.children.put(newChildAllocator, 
Thread.currentThread().getStackTrace());
       return newChildAllocator;
     }
@@ -183,6 +185,7 @@ public class TopLevelAllocator implements BufferAllocator {
     @Override
     public void close() {
       if (ENABLE_ACCOUNTING) {
+        if(thisMap != null) thisMap.remove(this);
         for (ChildAllocator child : children.keySet()) {
           if (!child.isClosed()) {
             StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 8a670fb..6fccb3b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -111,7 +111,7 @@ public class FragmentContext implements Closeable {
   }
 
   public void fail(Throwable cause) {
-    logger.error("Fragment Context received failure. {}", cause);
+    logger.error("Fragment Context received failure.", cause);
     failed = true;
     failureCause = cause;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4b7e33e..9472627 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -129,6 +129,10 @@ public class ScanBatch implements RecordBatch {
 
   @Override
   public void kill(boolean sendUpstream) {
+    if(currentReader != null){
+      currentReader.cleanup();
+    }
+
     if (sendUpstream) {
       done = true;
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 279374f..d96bdf3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -99,7 +99,7 @@ public class ScreenCreator implements RootCreator<Screen>{
 //      logger.debug("Screen Outcome {}", outcome);
       switch(outcome){
       case STOP: {
-          sendCount.waitForSendComplete();
+        this.internalStop();
         boolean verbose = 
context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
         QueryResult header = QueryResult.newBuilder() //
               .setQueryId(context.getHandle().getQueryId()) //
@@ -122,8 +122,7 @@ public class ScreenCreator implements RootCreator<Screen>{
           return false;
       }
       case NONE: {
-        sendCount.waitForSendComplete();
-//        context.getStats().batchesCompleted.inc(1);
+        this.internalStop();
         QueryWritableBatch batch;
         if (!first) {
           QueryResult header = QueryResult.newBuilder() //
@@ -168,22 +167,30 @@ public class ScreenCreator implements RootCreator<Screen>{
         throw new UnsupportedOperationException();
       }
     }
-    
+
     public void updateStats(QueryWritableBatch queryBatch) {
       stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount());
     }
 
-    @Override
-    public void stop() {
+
+    private void internalStop(){
       sendCount.waitForSendComplete();
       oContext.close();
       incoming.cleanup();
     }
 
+    @Override
+    public void stop() {
+      if(!oContext.isClosed()){
+        internalStop();
+      }
+      sendCount.waitForSendComplete();
+    }
+
     private SendListener listener = new SendListener();
 
     private class SendListener extends BaseRpcOutcomeListener<Ack>{
-      volatile RpcException ex; 
+      volatile RpcException ex;
 
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4fdb71d..a7e1360 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -111,6 +111,14 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
     return recordCount;
   }
 
+
+  @Override
+  protected void killIncoming(boolean sendUpstream) {
+    super.killIncoming(sendUpstream);
+    hasRemainder = false;
+  }
+
+
   @Override
   public IterOutcome innerNext() {
     if (hasRemainder) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index e4941d0..2a8db67 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -17,9 +17,9 @@
  */
 package org.apache.drill.exec.work.fragment;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
@@ -31,8 +31,6 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.work.CancelableQuery;
 import org.apache.drill.exec.work.StatusProvider;
-
-import com.codahale.metrics.Timer;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 
 /**
@@ -49,6 +47,7 @@ public class FragmentExecutor implements Runnable, 
CancelableQuery, StatusProvid
   private final WorkerBee bee;
   private final StatusReporter listener;
   private Thread executionThread;
+  private AtomicBoolean closed = new AtomicBoolean(false);
 
   public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot 
rootOperator, StatusReporter listener){
     this.context = context;
@@ -104,10 +103,14 @@ public class FragmentExecutor implements Runnable, 
CancelableQuery, StatusProvid
       while (state.get() == FragmentState.RUNNING_VALUE) {
         if (!root.next()) {
           if (context.isFailed()){
-            updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
+            internalFail(context.getFailureCause());
+            closeOutResources(false);
           } else {
+            closeOutResources(true); // make sure to close out resources 
before we report success.
             updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
           }
+
+          break;
         }
       }
     } catch (AssertionError | Exception e) {
@@ -116,17 +119,32 @@ public class FragmentExecutor implements Runnable, 
CancelableQuery, StatusProvid
       internalFail(e);
     } finally {
       bee.removeFragment(context.getHandle());
-      if (context.isFailed()) {
-        internalFail(context.getFailureCause());
-      }
-      root.stop(); // stop root executor & clean-up resources
-      context.close();
 
       logger.debug("Fragment runner complete. {}:{}", 
context.getHandle().getMajorFragmentId(), 
context.getHandle().getMinorFragmentId());
       Thread.currentThread().setName(originalThread);
     }
   }
 
+  private void closeOutResources(boolean throwFailure){
+    if(closed.get()) return;
+
+    try{
+      root.stop();
+    }catch(RuntimeException e){
+      if(throwFailure) throw e;
+      logger.warn("Failure while closing out resources.", e);
+    }
+
+    try{
+      context.close();
+    }catch(RuntimeException e){
+      if(throwFailure) throw e;
+      logger.warn("Failure while closing out resources.", e);
+    }
+
+    closed.set(true);
+  }
+
   private void internalFail(Throwable excep){
     state.set(FragmentState.FAILED_VALUE);
     listener.fail(context.getHandle(), "Failure while running fragment.", 
excep);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 0ac2c09..d9e8b20 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -47,6 +47,19 @@ public class TestExampleQueries extends BaseTestQuery{
   }
 
   @Test
+  public void testJoinMerge() throws Exception{
+    test("alter session set `planner.enable_hashjoin` = false");
+    test("select count(*) \n" +
+        "  from (select l.l_orderkey as x, c.c_custkey as y \n" +
+        "  from cp.`tpch/lineitem.parquet` l \n" +
+        "    left outer join cp.`tpch/customer.parquet` c \n" +
+        "      on l.l_orderkey = c.c_custkey) as foo\n" +
+        "  where x < 10000\n" +
+        "");
+    test("alter session set `planner.enable_hashjoin` = true");
+  }
+
+  @Test
   public void testSelStarOrderBy() throws Exception{
     test("select * from cp.`employee.json` order by last_name");
   }

Reply via email to