DRILL-892: Send Batch is leaking memory when send fails to establish connection to remote fragment.
Also: 1. Maitain one StatusHandler for all OutgoingRecordBatches in Partitioner. 2. In FragmentExecutor check for failures set in FragementContext. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e62c3650 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e62c3650 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e62c3650 Branch: refs/heads/master Commit: e62c3650d2c882bd2cf354d7a0dbc506a58fc051 Parents: c6c3cd5 Author: vkorukanti <[email protected]> Authored: Mon Jun 2 17:43:57 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Jun 5 09:35:08 2014 -0700 ---------------------------------------------------------------------- .../partitionsender/PartitionSenderRootExec.java | 9 ++++++++- .../physical/impl/partitionsender/Partitioner.java | 3 ++- .../impl/partitionsender/PartitionerTemplate.java | 16 ++++++++-------- .../org/apache/drill/exec/rpc/data/DataTunnel.java | 11 +++++++++-- .../drill/exec/work/fragment/FragmentExecutor.java | 16 ++++++++++++---- 5 files changed, 39 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 74a3c90..ffb3780 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -61,6 +61,7 @@ public class PartitionSenderRootExec implements RootExec { private final OperatorStats stats; private final int outGoingBatchCount; private final HashPartitionSender popConfig; + private final StatusHandler statusHandler; public PartitionSenderRootExec(FragmentContext context, @@ -74,6 +75,7 @@ public class PartitionSenderRootExec implements RootExec { this.stats = oContext.getStats(); this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; + this.statusHandler = new StatusHandler(sendCount, context); } @Override @@ -183,7 +185,7 @@ public class PartitionSenderRootExec implements RootExec { // compile and setup generated code // partitioner = context.getImplementationClassMultipleOutput(cg); partitioner = context.getImplementationClass(cg); - partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext); + partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext, statusHandler); } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); @@ -197,6 +199,11 @@ public class PartitionSenderRootExec implements RootExec { partitioner.clear(); } sendCount.waitForSendComplete(); + + if (!statusHandler.isOk()) { + context.fail(statusHandler.getException()); + } + oContext.close(); incoming.cleanup(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 8d6c19a..6958403 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -35,7 +35,8 @@ public interface Partitioner { HashPartitionSender popConfig, OperatorStats stats, SendingAccountor sendingAccountor, - OperatorContext oContext) throws SchemaChangeException; + OperatorContext oContext, + StatusHandler statusHandler) throws SchemaChangeException; public abstract void partitionBatch(RecordBatch incoming) throws IOException; public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 4a27262..510327a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -71,7 +71,8 @@ public abstract class PartitionerTemplate implements Partitioner { HashPartitionSender popConfig, OperatorStats stats, SendingAccountor sendingAccountor, - OperatorContext oContext) throws SchemaChangeException { + OperatorContext oContext, + StatusHandler statusHandler) throws SchemaChangeException { this.incoming = incoming; doSetup(context, incoming, null); @@ -79,7 +80,8 @@ public abstract class PartitionerTemplate implements Partitioner { int fieldId = 0; for (DrillbitEndpoint endpoint : popConfig.getDestinations()) { FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build(); - outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId)); + outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, + context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId, statusHandler)); fieldId++; } @@ -204,10 +206,11 @@ public abstract class PartitionerTemplate implements Partitioner { private static final int DEFAULT_RECORD_BATCH_SIZE = 20000; private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200; - private StatusHandler statusHandler; + private final StatusHandler statusHandler; public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, - FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { + FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId, + StatusHandler statusHandler) { this.context = context; this.allocator = allocator; this.operator = operator; @@ -215,7 +218,7 @@ public abstract class PartitionerTemplate implements Partitioner { this.sendCount = sendCount; this.stats = stats; this.oppositeMinorFragmentId = oppositeMinorFragmentId; - this.statusHandler = new StatusHandler(sendCount, context); + this.statusHandler = statusHandler; } protected boolean copy(int inIndex) throws IOException { @@ -346,9 +349,6 @@ public abstract class PartitionerTemplate implements Partitioner { return WritableBatch.getBatchNoHVWrap(recordCount, this, false); } - - - public void clear(){ vectorContainer.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index 98bbeeb..3c2b9e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.data; +import io.netty.buffer.ByteBuf; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -66,8 +67,14 @@ public class DataTunnel { public String toString() { return "SendBatch [batch.header=" + batch.getHeader() + "]"; } - - + + @Override + public void connectionFailed(FailureType type, Throwable t) { + for(ByteBuf buffer : batch.getBuffers()) { + buffer.release(); + } + super.connectionFailed(type, t); + } } private static class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 70f5dd0..11685c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -106,6 +106,9 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid } root.stop(); + if(context.isFailed()) { + internalFail(context.getFailureCause()); + } closed = true; @@ -115,10 +118,15 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid internalFail(ex); }finally{ Thread.currentThread().setName(originalThread); - if(!closed) try{ - context.close(); - }catch(RuntimeException e){ - logger.warn("Failure while closing context in failed state.", e); + if(!closed) { + try { + if(context.isFailed()) { + internalFail(context.getFailureCause()); + } + context.close(); + } catch (RuntimeException e) { + logger.warn("Failure while closing context in failed state.", e); + } } } logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
