fix merging receiver
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/bf3fa660 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/bf3fa660 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/bf3fa660 Branch: refs/heads/master Commit: bf3fa6607290a4d5a976bf48a16927690a981da8 Parents: 02b2068 Author: Steven Phillips <[email protected]> Authored: Mon Mar 31 16:33:05 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat Apr 19 18:07:09 2014 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/mergereceiver/MergingRecordBatch.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bf3fa660/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 13ed4c9..ee2244e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -139,8 +139,9 @@ public class MergingRecordBatch implements RecordBatch { context.fail(e); return IterOutcome.STOP; } - if (rawBatch.getHeader().getDef().getRecordCount() != 0) + if (rawBatch.getHeader().getDef().getRecordCount() != 0) { rawBatches.add(rawBatch); + } } // allocate the incoming record batch loaders @@ -179,7 +180,7 @@ public class MergingRecordBatch implements RecordBatch { // allocate a new value vector ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator()); - VectorAllocator allocator = VectorAllocator.getAllocator(v.getValueVector(), outgoingVector); + VectorAllocator allocator = VectorAllocator.getAllocator(outgoingVector, 50); allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT); allocators.add(allocator); outgoingContainer.add(outgoingVector); @@ -371,6 +372,7 @@ public class MergingRecordBatch implements RecordBatch { final ClassGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.getRoot(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry()); JExpression inIndex = JExpr.direct("inIndex"); + JExpression outIndex = JExpr.direct("outIndex"); JType valueVector2DArray = cg.getModel().ref(ValueVector.class).array().array(); JType valueVectorArray = cg.getModel().ref(ValueVector.class).array(); @@ -587,7 +589,7 @@ public class MergingRecordBatch implements RecordBatch { ((JExpression) JExpr.cast(vvClass, outgoingVectors.component(JExpr.lit(fieldIdx)))) .invoke("copyFrom") .arg(inIndex) - .arg(outgoingBatch.invoke("getRecordCount")) + .arg(outIndex) .arg(JExpr.cast(vvClass, ((JExpression) incomingVectors.component(JExpr.direct("inBatch"))) .component(JExpr.lit(fieldIdx)))));
