Fix issue introduced by DRILL-1202 where allocators are being closed after reporting success. Update ScreenRoot to cleanup before returning success. Update ScanBatch to cleanup reader in case of limit query to avoid memory leak in ParquetReader. Update allocators so that we don't have memory leak when using debug options. Update project record batch so that it doesn't try to return a released remainder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/15eeb9d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/15eeb9d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/15eeb9d3 Branch: refs/heads/master Commit: 15eeb9d38af517bc294d6c37c3f7dae82a33665f Parents: 4216e0e Author: Jacques Nadeau <[email protected]> Authored: Sun Aug 24 10:04:47 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sun Aug 24 10:42:50 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/memory/TopLevelAllocator.java | 9 +++-- .../apache/drill/exec/ops/FragmentContext.java | 2 +- .../drill/exec/physical/impl/ScanBatch.java | 4 +++ .../drill/exec/physical/impl/ScreenCreator.java | 21 ++++++++---- .../impl/project/ProjectRecordBatch.java | 8 +++++ .../exec/work/fragment/FragmentExecutor.java | 36 +++++++++++++++----- .../org/apache/drill/TestExampleQueries.java | 13 +++++++ 7 files changed, 73 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index 098922c..d89c892 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -98,7 +98,7 @@ public class TopLevelAllocator implements BufferAllocator { throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation())); }; logger.debug("New child allocator with initial reservation {}", initialReservation); - ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation); + ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation, childrenMap); if(ENABLE_ACCOUNTING) childrenMap.put(allocator, Thread.currentThread().getStackTrace()); return allocator; @@ -130,11 +130,13 @@ public class TopLevelAllocator implements BufferAllocator { private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>(); private boolean closed = false; private FragmentHandle handle; + private Map<ChildAllocator, StackTraceElement[]> thisMap; - public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre) throws OutOfMemoryException{ + public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre, Map<ChildAllocator, StackTraceElement[]> map) throws OutOfMemoryException{ assert max >= pre; childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, pre); this.handle = handle; + thisMap = map; } @Override @@ -171,7 +173,7 @@ public class TopLevelAllocator implements BufferAllocator { throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable())); }; logger.debug("New child allocator with initial reservation {}", initialReservation); - ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation); + ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation, null); this.children.put(newChildAllocator, Thread.currentThread().getStackTrace()); return newChildAllocator; } @@ -183,6 +185,7 @@ public class TopLevelAllocator implements BufferAllocator { @Override public void close() { if (ENABLE_ACCOUNTING) { + if(thisMap != null) thisMap.remove(this); for (ChildAllocator child : children.keySet()) { if (!child.isClosed()) { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 8a670fb..6fccb3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -111,7 +111,7 @@ public class FragmentContext implements Closeable { } public void fail(Throwable cause) { - logger.error("Fragment Context received failure. {}", cause); + logger.error("Fragment Context received failure.", cause); failed = true; failureCause = cause; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 4b7e33e..9472627 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -129,6 +129,10 @@ public class ScanBatch implements RecordBatch { @Override public void kill(boolean sendUpstream) { + if(currentReader != null){ + currentReader.cleanup(); + } + if (sendUpstream) { done = true; } else { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 279374f..d96bdf3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -99,7 +99,7 @@ public class ScreenCreator implements RootCreator<Screen>{ // logger.debug("Screen Outcome {}", outcome); switch(outcome){ case STOP: { - sendCount.waitForSendComplete(); + this.internalStop(); boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val; QueryResult header = QueryResult.newBuilder() // .setQueryId(context.getHandle().getQueryId()) // @@ -122,8 +122,7 @@ public class ScreenCreator implements RootCreator<Screen>{ return false; } case NONE: { - sendCount.waitForSendComplete(); -// context.getStats().batchesCompleted.inc(1); + this.internalStop(); QueryWritableBatch batch; if (!first) { QueryResult header = QueryResult.newBuilder() // @@ -168,22 +167,30 @@ public class ScreenCreator implements RootCreator<Screen>{ throw new UnsupportedOperationException(); } } - + public void updateStats(QueryWritableBatch queryBatch) { stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount()); } - @Override - public void stop() { + + private void internalStop(){ sendCount.waitForSendComplete(); oContext.close(); incoming.cleanup(); } + @Override + public void stop() { + if(!oContext.isClosed()){ + internalStop(); + } + sendCount.waitForSendComplete(); + } + private SendListener listener = new SendListener(); private class SendListener extends BaseRpcOutcomeListener<Ack>{ - volatile RpcException ex; + volatile RpcException ex; @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 4fdb71d..a7e1360 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -111,6 +111,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ return recordCount; } + + @Override + protected void killIncoming(boolean sendUpstream) { + super.killIncoming(sendUpstream); + hasRemainder = false; + } + + @Override public IterOutcome innerNext() { if (hasRemainder) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index e4941d0..2a8db67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -17,9 +17,9 @@ */ package org.apache.drill.exec.work.fragment; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; @@ -31,8 +31,6 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.work.CancelableQuery; import org.apache.drill.exec.work.StatusProvider; - -import com.codahale.metrics.Timer; import org.apache.drill.exec.work.WorkManager.WorkerBee; /** @@ -49,6 +47,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid private final WorkerBee bee; private final StatusReporter listener; private Thread executionThread; + private AtomicBoolean closed = new AtomicBoolean(false); public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener){ this.context = context; @@ -104,10 +103,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid while (state.get() == FragmentState.RUNNING_VALUE) { if (!root.next()) { if (context.isFailed()){ - updateState(FragmentState.RUNNING, FragmentState.FAILED, false); + internalFail(context.getFailureCause()); + closeOutResources(false); } else { + closeOutResources(true); // make sure to close out resources before we report success. updateState(FragmentState.RUNNING, FragmentState.FINISHED, false); } + + break; } } } catch (AssertionError | Exception e) { @@ -116,17 +119,32 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid internalFail(e); } finally { bee.removeFragment(context.getHandle()); - if (context.isFailed()) { - internalFail(context.getFailureCause()); - } - root.stop(); // stop root executor & clean-up resources - context.close(); logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); Thread.currentThread().setName(originalThread); } } + private void closeOutResources(boolean throwFailure){ + if(closed.get()) return; + + try{ + root.stop(); + }catch(RuntimeException e){ + if(throwFailure) throw e; + logger.warn("Failure while closing out resources.", e); + } + + try{ + context.close(); + }catch(RuntimeException e){ + if(throwFailure) throw e; + logger.warn("Failure while closing out resources.", e); + } + + closed.set(true); + } + private void internalFail(Throwable excep){ state.set(FragmentState.FAILED_VALUE); listener.fail(context.getHandle(), "Failure while running fragment.", excep); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/15eeb9d3/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 0ac2c09..d9e8b20 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -47,6 +47,19 @@ public class TestExampleQueries extends BaseTestQuery{ } @Test + public void testJoinMerge() throws Exception{ + test("alter session set `planner.enable_hashjoin` = false"); + test("select count(*) \n" + + " from (select l.l_orderkey as x, c.c_custkey as y \n" + + " from cp.`tpch/lineitem.parquet` l \n" + + " left outer join cp.`tpch/customer.parquet` c \n" + + " on l.l_orderkey = c.c_custkey) as foo\n" + + " where x < 10000\n" + + ""); + test("alter session set `planner.enable_hashjoin` = true"); + } + + @Test public void testSelStarOrderBy() throws Exception{ test("select * from cp.`employee.json` order by last_name"); }
