DRILL-326: Fixes for merge join allocations
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d3c01968 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d3c01968 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d3c01968 Branch: refs/heads/master Commit: d3c01968ec0afd5d188f23becefaec456d59b168 Parents: e64a682 Author: Jinfeng Ni <[email protected]> Authored: Mon Mar 17 09:01:21 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Mar 17 09:05:28 2014 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/join/JoinStatus.java | 4 + .../exec/physical/impl/join/JoinTemplate.java | 15 ++-- .../exec/physical/impl/join/MergeJoinBatch.java | 10 ++- .../IteratorValidatorBatchIterator.java | 4 + .../apache/drill/exec/record/RecordBatch.java | 3 + .../exec/physical/impl/join/TestMergeJoin.java | 26 ++++++ .../src/test/resources/join/join_batchsize.json | 88 ++++++++++++++++++++ 7 files changed, 142 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index 762cce7..bf87c0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -101,6 +101,10 @@ public final class JoinStatus { outputPosition = 0; } + public final void incOutputPos() { + outputPosition++; + } + public final void notifyLeftRepeating() { leftRepeating = true; outputBatch.resetBatchBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index 7c8a51c..f43934e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -95,8 +95,10 @@ public abstract class JoinTemplate implements JoinWorker { if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { // we've hit the end of the right record batch; copy any remaining values from the left batch while (status.isLeftPositionAllowed()) { - if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) return false; + + status.incOutputPos(); status.advanceLeft(); } } @@ -110,10 +112,11 @@ public abstract class JoinTemplate implements JoinWorker { case -1: // left key < right key - if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) - if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) { + if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) return false; - } + status.incOutputPos(); + } status.advanceLeft(); continue; @@ -140,9 +143,11 @@ public abstract class JoinTemplate implements JoinWorker { if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) return false; - if (!doCopyRight(status.getRightPosition(), status.fetchAndIncOutputPos())) + if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) return false; + status.incOutputPos(); + // If the left key has duplicates and we're about to cross a boundary in the right batch, add the // right table's record batch to the sv4 builder before calling next. These records will need to be // copied again for each duplicate left key. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index bd668e7..7680ff9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -395,16 +395,20 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private void allocateBatch() { // allocate new batch space. container.clear(); - // add fields from both batches + + //estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE. + int joinBatchSize = Math.min(Math.max(left.getRecordCount() , right.getRecordCount() ) * 16, MAX_BATCH_SIZE); + + // add fields from both batches for (VectorWrapper<?> w : left) { ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator()); - VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(left.getRecordCount() * 16); + VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize); container.add(outgoingVector); } for (VectorWrapper<?> w : right) { ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator()); - VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(right.getRecordCount() * 16); + VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(joinBatchSize); container.add(outgoingVector); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 8552465..379fad2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -105,6 +105,10 @@ public class IteratorValidatorBatchIterator implements RecordBatch{ public IterOutcome next() { if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again."); state = incoming.next(); + + if ((state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) && incoming.getRecordCount() > MAX_BATCH_SIZE) + throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE)); + return state; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index b41b733..b77a6a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -33,6 +33,9 @@ import org.apache.drill.exec.vector.ValueVector; */ public interface RecordBatch extends VectorAccessible { + /* max batch size, limited by 2-byte-lentgh in SV2 : 65535 = 2^16 -1 */ + public static final int MAX_BATCH_SIZE = 65535; + /** * Describes the outcome of a RecordBatch being incremented forward. */ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java index 27fae08..6e681e1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java @@ -42,6 +42,7 @@ import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.rpc.user.UserServer; +import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.vector.ValueVector; @@ -266,6 +267,31 @@ public class TestMergeJoin { } + @Test + public void testJoinBatchSize(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{ + new NonStrictExpectations(){{ + bitContext.getMetrics(); result = new MetricRegistry(); + bitContext.getAllocator(); result = new TopLevelAllocator();; + bitContext.getConfig(); result = c; + bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c); + }}; + + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/join/join_batchsize.json"), Charsets.UTF_8)); + FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); + FragmentContext context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry); + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); + while(exec.next()){ + assertEquals(100, exec.getRecordCount()); + } + + if(context.getFailureCause() != null){ + throw context.getFailureCause(); + } + assertTrue(!context.isFailed()); + + } + @AfterClass public static void tearDown() throws Exception{ // pause to get logger to catch up. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d3c01968/exec/java-exec/src/test/resources/join/join_batchsize.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/join/join_batchsize.json b/exec/java-exec/src/test/resources/join/join_batchsize.json new file mode 100644 index 0000000..a3be355 --- /dev/null +++ b/exec/java-exec/src/test/resources/join/join_batchsize.json @@ -0,0 +1,88 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://source1.apache.org", + entries:[ + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + pop : "sort", + @id : 2, + child : 1, + orderings : [ { + order : "ASC", + expr : "blue" + } ], + reverse : false + }, + { + pop : "selection-vector-remover", + @id : 3, + child : 2 + }, + { + @id:4, + pop:"mock-sub-scan", + url: "http://source2.apache.org", + entries:[ + {records: 2, types: [ + {name: "blue1", type: "INT", mode: "REQUIRED"}, + {name: "red1", type: "INT", mode: "REQUIRED"}, + {name: "green1", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + pop : "sort", + @id : 5, + child : 4, + orderings : [ { + order : "ASC", + expr : "blue1" + } ], + reverse : false + }, + { + pop : "selection-vector-remover", + @id : 6, + child : 5 + }, + { + @id: 7, + right: 6, + left: 3, + pop: "merge-join", + join-conditions: [ {relationship: "==", left: "blue", right: "blue1"} ] + }, + { + pop : "limit", + @id : 8, + child : 7, + first : 0, + last : 100 + }, { + pop : "selection-vector-remover", + @id : 9, + child : 8 + }, + { + @id: 10, + child: 9, + pop: "screen" + } + ] +}
