DRILL-1092: Ensure we have enough records while performing merge and spill in ExternalSortBatch
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5080585b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5080585b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5080585b Branch: refs/heads/master Commit: 5080585b6e2f69f49931e860de1cd4e27d08af2a Parents: 19dbb02 Author: Mehant Baid <[email protected]> Authored: Mon Jul 14 23:28:04 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sun Jul 20 16:28:02 2014 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/xsort/ExternalSortBatch.java | 11 +++++++---- .../exec/physical/impl/xsort/PriorityQueueCopier.java | 2 +- .../physical/impl/xsort/PriorityQueueCopierTemplate.java | 7 ++++++- 3 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 237a631..e5da896 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -339,6 +339,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { VectorContainer hyperBatch = new VectorContainer(); VectorContainer outputContainer = new VectorContainer(); List<BatchGroup> batchGroupList = Lists.newArrayList(); + int recordCount = 0; for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) { if (batchGroups.size() == 0) { break; @@ -346,13 +347,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { if (batchGroups.peekLast().getSecondContainer() != null) { break; } - batchGroupList.add(batchGroups.pollLast()); + BatchGroup batch = batchGroups.pollLast(); + recordCount += batch.getSv2().getCount(); + batchGroupList.add(batch); } if (batchGroupList.size() == 0) { return; } constructHyperBatch(batchGroupList, hyperBatch); - createCopier(hyperBatch, batchGroupList, outputContainer); + createCopier(hyperBatch, batchGroupList, outputContainer, recordCount); int count = copier.next(); assert count > 0; @@ -535,7 +538,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } } - private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList, VectorContainer outputContainer) throws SchemaChangeException { + private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList, VectorContainer outputContainer, int recordCount) throws SchemaChangeException { try { if (copier == null) { CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry()); @@ -555,7 +558,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { outputContainer.add(v); allocators.add(VectorAllocator.getAllocator(v, 110)); } - copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators); + copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators, recordCount); } catch (ClassTransformationException e) { throw new RuntimeException(e); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java index 7122963..f2da717 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java @@ -32,7 +32,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator; import java.util.List; public interface PriorityQueueCopier { - public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException; + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators, int recordCnt) throws SchemaChangeException; public int next(); public List<VectorAllocator> getAllocators(); public void cleanup(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java index a74f14c..6e9c355 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java @@ -44,7 +44,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT; @Override - public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException { + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators, int recordCnt) throws SchemaChangeException { this.context = context; this.allocator = allocator; this.hyperBatch = hyperBatch; @@ -64,6 +64,11 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier siftUp(); queueSize++; } + + // Check if the we have enough records to create BatchData with two containers. + if (recordCnt < (2 * targetRecordCount)) { + targetRecordCount = (recordCnt / 2); + } } @Override
