Repository: drill Updated Branches: refs/heads/master 72f946964 -> 68aa81f47
DRILL-3133: MergingRecordBatch can leak memory if query is canceled before batches in rawBatches were loaded Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/68aa81f4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/68aa81f4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/68aa81f4 Branch: refs/heads/master Commit: 68aa81f471d2cea779f7f2acd7d84ce43bb1b94b Parents: 72f9469 Author: adeneche <[email protected]> Authored: Mon May 18 10:01:52 2015 -0700 Committer: adeneche <[email protected]> Committed: Thu Jul 9 16:19:28 2015 -0700 ---------------------------------------------------------------------- .../impl/mergereceiver/MergingRecordBatch.java | 70 ++++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/68aa81f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 3ca11f1..49e81ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -96,7 +96,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private RecordBatchLoader[] batchLoaders; private final RawFragmentBatchProvider[] fragProviders; private final FragmentContext context; - private BatchSchema schema; private VectorContainer outgoingContainer; private MergingReceiverGeneratorBase merger; private final MergingReceiverPOP config; @@ -139,7 +138,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> this.outputCounts = new long[config.getNumSenders()]; } - private RawFragmentBatch getNext(final int providerIndex) throws IOException{ + private RawFragmentBatch getNext(final int providerIndex) throws IOException { stats.startWait(); final RawFragmentBatchProvider provider = fragProviders[providerIndex]; try { @@ -162,6 +161,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } + private void clearBatches(List<RawFragmentBatch> batches) { + for (RawFragmentBatch batch : batches) { + if (batch != null) { + batch.release(); + } + } + } + @Override public IterOutcome innerNext() { if (fragProviders.length == 0) { @@ -190,22 +197,25 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> final List<RawFragmentBatch> rawBatches = Lists.newArrayList(); int p = 0; for (final RawFragmentBatchProvider provider : fragProviders) { - RawFragmentBatch rawBatch = null; - try { - // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema - if (tempBatchHolder[p] != null) { - rawBatch = tempBatchHolder[p]; - tempBatchHolder[p] = null; - } else { + RawFragmentBatch rawBatch; + // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema + if (tempBatchHolder[p] != null) { + rawBatch = tempBatchHolder[p]; + tempBatchHolder[p] = null; + } else { + try { rawBatch = getNext(p); - } - if (rawBatch == null && !context.shouldContinue()) { + } catch (final IOException e) { + context.fail(e); return IterOutcome.STOP; } - } catch (final IOException e) { - context.fail(e); + } + if (rawBatch == null && !context.shouldContinue()) { + clearBatches(rawBatches); return IterOutcome.STOP; } + + assert rawBatch != null : "rawBatch is null although context.shouldContinue() == true"; if (rawBatch.getHeader().getDef().getRecordCount() != 0) { rawBatches.add(rawBatch); } else { @@ -215,13 +225,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } try { while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { - ; + // Do nothing } if (rawBatch == null && !context.shouldContinue()) { + clearBatches(rawBatches); return IterOutcome.STOP; } } catch (final IOException e) { context.fail(e); + clearBatches(rawBatches); return IterOutcome.STOP; } if (rawBatch != null) { @@ -243,6 +255,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> batchLoaders[i] = new RecordBatchLoader(oContext.getAllocator()); } + // after this point all batches have moved to incomingBatches + rawBatches.clear(); + int i = 0; for (final RawFragmentBatch batch : incomingBatches) { // initialize the incoming batchLoaders @@ -261,6 +276,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> ++i; } + // after this point all batches have been released and their bytebuf are in batchLoaders + // Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath. for (final RecordBatchLoader loader : batchLoaders) { loader.canonicalize(); @@ -268,31 +285,22 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // Ensure all the incoming batches have the identical schema. if (!isSameSchemaAmongBatches(batchLoaders)) { - logger.error("Incoming batches for merging receiver have diffferent schemas!"); - context.fail(new SchemaChangeException("Incoming batches for merging receiver have diffferent schemas!")); + context.fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!")); return IterOutcome.STOP; } // create the outgoing schema and vector container, and allocate the initial batch final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); - int vectorCount = 0; for (final VectorWrapper<?> v : batchLoaders[0]) { // add field to the output schema bldr.addField(v.getField()); // allocate a new value vector - final ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField()); - ++vectorCount; + outgoingContainer.addOrGet(v.getField()); } allocateOutgoing(); - - schema = bldr.build(); - if (schema != null && !schema.equals(schema)) { - // TODO: handle case where one or more batches implicitly indicate schema change - logger.debug("Initial state has incoming batches with different schemas"); - } outgoingContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE); // generate code for merge operations (copy and compare) @@ -305,7 +313,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } // allocate the priority queue with the generated comparator - this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() { + this.pqueue = new PriorityQueue<>(fragProviders.length, new Comparator<Node>() { public int compare(final Node node1, final Node node2) { final int leftIndex = (node1.batchId << 16) + node1.valueIndex; final int rightIndex = (node2.batchId << 16) + node2.valueIndex; @@ -321,8 +329,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> incomingBatches[b] = batch; if (batch != null) { batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody()); - // TODO: Clean: DRILL-2933: That load(...) no longer throws - // SchemaChangeException, so check/clean catch clause below. } else { batchLoaders[b].clear(); batchLoaders[b] = null; @@ -353,15 +359,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } pqueue.poll(); -// if (isOutgoingFull()) { -// // set a flag so that we reallocate on the next iteration -// logger.debug("Outgoing vectors record batch size reached; breaking"); -// prevBatchWasFull = true; -// } - if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) { // reached the end of an incoming record batch - RawFragmentBatch nextBatch = null; + RawFragmentBatch nextBatch; try { nextBatch = getNext(node.batchId);
