fix merge join to handle case where right batch has zero records
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ba123e69 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ba123e69 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ba123e69 Branch: refs/heads/master Commit: ba123e69da6bdf4977e9db151236c3903ebcc028 Parents: dcc102a Author: Steven Phillips <[email protected]> Authored: Wed Apr 2 22:20:25 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat Apr 19 18:07:11 2014 -0700 ---------------------------------------------------------------------- .../exec/physical/config/MergeJoinPOP.java | 1 + .../exec/physical/impl/join/JoinStatus.java | 20 +++- .../exec/physical/impl/join/MergeJoinBatch.java | 107 ++++++++++++------- .../exec/physical/impl/join/TestMergeJoin.java | 55 +++++++++- .../resources/join/merge_join_empty_batch.json | 1 + 5 files changed, 135 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java index a9a0f6d..047bed7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java @@ -62,6 +62,7 @@ public class MergeJoinPOP extends AbstractBase{ this.right = right; this.conditions = conditions; this.joinType = joinType; + Preconditions.checkArgument(joinType != JoinRelType.FULL, "Full outer join not currently supported"); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index 5a83b48..baa232e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.eigenbase.rel.JoinRelType; /** * The status of the current join. Maintained outside the individually compiled join templates so that we can carry status across multiple schemas. @@ -48,6 +49,8 @@ public final class JoinStatus { public MergeJoinBatch outputBatch; public SelectionVector4 sv4; + private final JoinRelType joinType; + public boolean ok = true; private boolean initialSet = false; private boolean leftRepeating = false; @@ -57,6 +60,7 @@ public final class JoinStatus { this.left = left; this.right = right; this.outputBatch = output; + this.joinType = output.getJoinType(); } public final void ensureInitial(){ @@ -204,17 +208,25 @@ public final class JoinStatus { public JoinOutcome getOutcome(){ if (!ok) return JoinOutcome.FAILURE; - if (lastLeft == IterOutcome.OK && lastRight == IterOutcome.OK) + if (bothMatches(IterOutcome.NONE) || + (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) || + (joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) || + (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) + return JoinOutcome.NO_MORE_DATA; + if (bothMatches(IterOutcome.OK) || + (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) return JoinOutcome.BATCH_RETURNED; - if (eitherMatches(IterOutcome.NONE)) - return JoinOutcome.NO_MORE_DATA; if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) return JoinOutcome.SCHEMA_CHANGED; if (eitherMatches(IterOutcome.NOT_YET)) return JoinOutcome.WAITING; return JoinOutcome.FAILURE; } - + + private boolean bothMatches(IterOutcome outcome){ + return lastLeft == outcome && lastRight == outcome; + } + private boolean eitherMatches(IterOutcome outcome){ return lastLeft == outcome || lastRight == outcome; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 0cb2f7d..bbdfbe5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -25,7 +25,10 @@ import java.util.List; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.TypedNullConstant; import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; @@ -108,10 +111,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } this.left = left; this.right = right; + this.joinType = popConfig.getJoinType(); this.status = new JoinStatus(left, right, this); this.batchBuilder = new MergeJoinBatchBuilder(context, status); - this.joinType = popConfig.getJoinType(); - this.conditions = popConfig.getConditions(); + this.conditions = popConfig.getConditions(); + } + + public JoinRelType getJoinType() { + return joinType; } @Override @@ -319,19 +326,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { ////////////////////// cg.setMappingSet(copyLeftMapping); int vectorId = 0; - for (VectorWrapper<?> vw : left) { - JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft", - new TypedFieldId(vw.getField().getType(), vectorId)); - JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", - new TypedFieldId(vw.getField().getType(),vectorId)); - // todo: check result of copyFromSafe and grow allocation - cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe") - .arg(copyLeftMapping.getValueReadIndex()) - .arg(copyLeftMapping.getValueWriteIndex()) - .arg(vvIn).eq(JExpr.FALSE)) - ._then() - ._return(JExpr.FALSE); - ++vectorId; + if (status.isLeftPositionAllowed()) { + for (VectorWrapper<?> vw : left) { + JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft", + new TypedFieldId(vw.getField().getType(), vectorId)); + JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", + new TypedFieldId(vw.getField().getType(),vectorId)); + // todo: check result of copyFromSafe and grow allocation + cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe") + .arg(copyLeftMapping.getValueReadIndex()) + .arg(copyLeftMapping.getValueWriteIndex()) + .arg(vvIn).eq(JExpr.FALSE)) + ._then() + ._return(JExpr.FALSE); + ++vectorId; + } } cg.getEvalBlock()._return(JExpr.lit(true)); @@ -340,19 +349,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { cg.setMappingSet(copyRightMappping); int rightVectorBase = vectorId; - for (VectorWrapper<?> vw : right) { - JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight", - new TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase)); - JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", - new TypedFieldId(vw.getField().getType(),vectorId)); - // todo: check result of copyFromSafe and grow allocation - cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe") - .arg(copyRightMappping.getValueReadIndex()) - .arg(copyRightMappping.getValueWriteIndex()) - .arg(vvIn).eq(JExpr.FALSE)) - ._then() - ._return(JExpr.FALSE); - ++vectorId; + if (status.isRightPositionAllowed()) { + for (VectorWrapper<?> vw : right) { + JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight", + new TypedFieldId(vw.getField().getType(), vectorId - rightVectorBase)); + JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", + new TypedFieldId(vw.getField().getType(),vectorId)); + // todo: check result of copyFromSafe and grow allocation + cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe") + .arg(copyRightMappping.getValueReadIndex()) + .arg(copyRightMappping.getValueWriteIndex()) + .arg(vvIn).eq(JExpr.FALSE)) + ._then() + ._return(JExpr.FALSE); + ++vectorId; + } } cg.getEvalBlock()._return(JExpr.lit(true)); @@ -366,19 +377,25 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { container.clear(); //estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE. - int joinBatchSize = Math.min(Math.max(left.getRecordCount() , right.getRecordCount() ) * 16, MAX_BATCH_SIZE); + int leftCount = status.isLeftPositionAllowed() ? left.getRecordCount() : 0; + int rightCount = status.isRightPositionAllowed() ? right.getRecordCount() : 0; + int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, MAX_BATCH_SIZE); - // add fields from both batches - for (VectorWrapper<?> w : left) { - ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator()); - VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize); - container.add(outgoingVector); + // add fields from both batches + if (leftCount > 0) { + for (VectorWrapper<?> w : left) { + ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator()); + VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize); + container.add(outgoingVector); + } } - for (VectorWrapper<?> w : right) { - ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator()); - VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(joinBatchSize); - container.add(outgoingVector); + if (rightCount > 0) { + for (VectorWrapper<?> w : right) { + ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator()); + VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(joinBatchSize); + container.add(outgoingVector); + } } container.buildSchema(BatchSchema.SelectionVectorMode.NONE); @@ -395,12 +412,22 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { final LogicalExpression rightFieldExpr = condition.getRight(); // materialize value vector readers from join expression - final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry()); + LogicalExpression materializedLeftExpr; + if (status.isLeftPositionAllowed()) { + materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry()); + } else { + materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT)); + } if (collector.hasErrors()) throw new ClassTransformationException(String.format( "Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString())); - final LogicalExpression materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry()); + LogicalExpression materializedRightExpr; + if (status.isRightPositionAllowed()) { + materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry()); + } else { + materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT)); + } if (collector.hasErrors()) throw new ClassTransformationException(String.format( "Failure while trying to materialize incoming right field. Errors:\n %s.", collector.toErrorString())); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java index 8e92181..1623b86 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java @@ -53,6 +53,7 @@ import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.vector.ValueVector; import org.junit.AfterClass; +import org.junit.Ignore; import org.junit.Test; import com.beust.jcommander.internal.Lists; @@ -299,7 +300,29 @@ public class TestMergeJoin extends PopUnitTestBase { } @Test - public void testMergeJoinEmptyBatch() throws Exception { + public void testMergeJoinInnerEmptyBatch() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/join/merge_join_empty_batch.json"), + Charsets.UTF_8) + .replace("${JOIN_TYPE}", "INNER")); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(0, count); + } + } + + @Test + public void testMergeJoinLeftEmptyBatch() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); @@ -309,7 +332,30 @@ public class TestMergeJoin extends PopUnitTestBase { client.connect(); List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/join/merge_join_empty_batch.json"), - Charsets.UTF_8)); + Charsets.UTF_8) + .replace("${JOIN_TYPE}", "LEFT")); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(50, count); + } + } + + @Test + public void testMergeJoinRightEmptyBatch() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/join/merge_join_empty_batch.json"), + Charsets.UTF_8) + .replace("${JOIN_TYPE}", "RIGHT")); int count = 0; for(QueryResultBatch b : results) { if (b.getHeader().getRowCount() != 0) @@ -317,9 +363,8 @@ public class TestMergeJoin extends PopUnitTestBase { } assertEquals(0, count); } - } - - + } + @AfterClass public static void tearDown() throws Exception{ // pause to get logger to catch up. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba123e69/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json b/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json index 549cb83..19254e3 100644 --- a/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json +++ b/exec/java-exec/src/test/resources/join/merge_join_empty_batch.json @@ -36,6 +36,7 @@ right: 1, left: 2, pop: "merge-join", + join-type: "${JOIN_TYPE}", join-conditions: [ {relationship: "==", left: "blue1", right: "blue"} ] }, {
