DRILL-1105: Fix bug in streaming aggregate when first batch is empty
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/dedff8ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/dedff8ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/dedff8ca Branch: refs/heads/master Commit: dedff8ca8559dba758326bf8cec95b6a4be05416 Parents: 405abb2 Author: Steven Phillips <[email protected]> Authored: Mon Jun 30 14:05:31 2014 -0700 Committer: Aditya Kishore <[email protected]> Committed: Thu Jul 3 02:11:24 2014 -0700 ---------------------------------------------------------------------- .../impl/aggregate/StreamingAggTemplate.java | 29 ++++++++++++++++++++ 1 file changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dedff8ca/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 8a9ba3b..6ed37e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -47,6 +47,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private StreamingAggBatch outgoing; private FragmentContext context; private InternalBatch remainderBatch; + private boolean done = false; @Override @@ -84,6 +85,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { @Override public AggOutcome doWork() { + if (done) { + outcome = IterOutcome.NONE; + return AggOutcome.CLEANUP_AND_RETURN; + } try{ // outside loop to ensure that first is set to false after the first run. outputCount = 0; @@ -92,6 +97,29 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { allocateOutgoing(); } + if (incoming.getRecordCount() == 0) { + outer: while (true) { + IterOutcome out = outgoing.next(0, incoming); + switch (out) { + case OK_NEW_SCHEMA: + case OK: + if (incoming.getRecordCount() == 0) { + continue; + } else { + break outer; + } + case NONE: + out = IterOutcome.OK_NEW_SCHEMA; + case STOP: + default: + lastOutcome = out; + outcome = out; + done = true; + return AggOutcome.CLEANUP_AND_RETURN; + } + } + } + // pick up a remainder batch if we have one. if(remainderBatch != null){ if (!outputToBatch( previousIndex )) return tooBigFailure(); @@ -162,6 +190,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out); switch(out){ case NONE: + done = true; lastOutcome = out; if (first && addedRecordCount == 0) { return setOkAndReturn();
