DRILL-1107: Fix regression from first patch
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/810a2040 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/810a2040 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/810a2040 Branch: refs/heads/master Commit: 810a20409089a10ae984807ff4418bfe7ee18957 Parents: 0b905fe Author: Steven Phillips <[email protected]> Authored: Mon Jul 7 14:26:00 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jul 7 15:53:33 2014 -0700 ---------------------------------------------------------------------- .../impl/mergereceiver/MergingRecordBatch.java | 28 +++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/810a2040/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index e83f461..914a187 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -187,6 +187,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } if (rawBatch != null) { rawBatches.add(rawBatch); + } else { + rawBatches.add(emptyBatch); } } } @@ -288,8 +290,26 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> }); // populate the priority queue with initial values - for (int b = 0; b < senderCount; ++b) - pqueue.add(new Node(b, 0)); + for (int b = 0; b < senderCount; ++b) { + while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) { + try { + RawFragmentBatch batch = getNext(fragProviders[b]); + incomingBatches[b] = batch; + if (batch != null) { + batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody()); + } else { + batchLoaders[b].clear(); + batchLoaders[b] = null; + } + } catch (IOException | SchemaChangeException e) { + context.fail(e); + return IterOutcome.STOP; + } + } + if (batchLoaders[b] != null) { + pqueue.add(new Node(b, 0)); + } + } hasRun = true; // finished lazy initialization @@ -566,7 +586,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> outgoingContainer.clear(); if (batchLoaders != null) { for(RecordBatchLoader rbl : batchLoaders){ - rbl.clear(); + if (rbl != null) { + rbl.clear(); + } } } oContext.close();
