DRILL-1107: Handle case in Hash join where first batch has zero records Handle case in merging receiver when first batch has zero records
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e0de4650 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e0de4650 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e0de4650 Branch: refs/heads/master Commit: e0de4650a112eba6db31685152c132cf6015cc56 Parents: 1e1e438 Author: Steven Phillips <[email protected]> Authored: Wed Jul 2 23:47:53 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Jul 3 08:59:58 2014 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/join/HashJoinBatch.java | 10 ++++++- .../impl/join/HashJoinProbeTemplate.java | 2 +- .../impl/mergereceiver/MergingRecordBatch.java | 28 ++++++++++++++------ .../work/batch/UnlimitedRawBatchBuffer.java | 7 ++++- 4 files changed, 36 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 2e33d50..e24f250 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -108,6 +108,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { private boolean first = true; + private boolean done = false; + // Generator mapping for the build side private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */, "projectBuildRecord" /* eval method */, @@ -160,6 +162,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { @Override public IterOutcome innerNext() { + if (done) { + return IterOutcome.NONE; + } try { /* If we are here for the first time, execute the build phase of the * hash join and setup the run time generated class for the probe side @@ -238,8 +243,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } // No more output records, clean up and return + done = true; + if (first) { + return IterOutcome.OK_NEW_SCHEMA; + } return IterOutcome.NONE; - } catch (ClassTransformationException | SchemaChangeException | IOException e) { context.fail(e); killIncoming(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 9c84e54..0b90362 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -107,7 +107,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { } public void executeProbePhase() throws SchemaChangeException { - while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsToProcess > 0) { + while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) { // Check if we have processed all records in this batch we need to invoke next if (recordsProcessed == recordsToProcess) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index ace1539..e83f461 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -137,8 +137,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> @Override public IterOutcome innerNext() { - if (fragProviders.length == 0) return IterOutcome.NONE; - if (done) return IterOutcome.NONE; + if (fragProviders.length == 0) { + return IterOutcome.NONE; + } + if (done) { + return IterOutcome.NONE; + } boolean schemaChanged = false; if (prevBatchWasFull) { @@ -171,11 +175,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } if (rawBatch.getHeader().getDef().getRecordCount() != 0) { rawBatches.add(rawBatch); - } else if (emptyBatch == null) { - emptyBatch = rawBatch; - } - if (firstBatch) { - schema = BatchSchema.newBuilder().addSerializedFields(rawBatch.getHeader().getDef().getFieldList()).build(); + } else { + if (emptyBatch == null) { + emptyBatch = rawBatch; + } + try { + while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0); + } catch (IOException e) { + context.fail(e); + return IterOutcome.STOP; + } + if (rawBatch != null) { + rawBatches.add(rawBatch); + } } } @@ -375,7 +387,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> @Override public BatchSchema getSchema() { - return schema; + return outgoingContainer.getSchema(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 9d24c66..41d70a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -77,7 +77,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public void cleanup() { if (!finished) { - context.fail(new IllegalStateException("Cleanup before finished")); + IllegalStateException e = new IllegalStateException("Cleanup before finished"); + context.fail(e); + throw e; } if (!buffer.isEmpty()) { @@ -158,6 +160,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ if (b == null && buffer.size() > 0) { throw new IllegalStateException("Returning null when there are batches left in queue"); } + if (b == null && !finished) { + throw new IllegalStateException("Returning null when not finished"); + } return b; }
