DRILL-1092: Ensure we have enough records while performing merge and spill in 
ExternalSortBatch


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5080585b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5080585b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5080585b

Branch: refs/heads/master
Commit: 5080585b6e2f69f49931e860de1cd4e27d08af2a
Parents: 19dbb02
Author: Mehant Baid <[email protected]>
Authored: Mon Jul 14 23:28:04 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Sun Jul 20 16:28:02 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/xsort/ExternalSortBatch.java      | 11 +++++++----
 .../exec/physical/impl/xsort/PriorityQueueCopier.java    |  2 +-
 .../physical/impl/xsort/PriorityQueueCopierTemplate.java |  7 ++++++-
 3 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 237a631..e5da896 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -339,6 +339,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
     VectorContainer hyperBatch = new VectorContainer();
     VectorContainer outputContainer = new VectorContainer();
     List<BatchGroup> batchGroupList = Lists.newArrayList();
+    int recordCount = 0;
     for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) {
       if (batchGroups.size() == 0) {
         break;
@@ -346,13 +347,15 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
       if (batchGroups.peekLast().getSecondContainer() != null) {
         break;
       }
-      batchGroupList.add(batchGroups.pollLast());
+      BatchGroup batch = batchGroups.pollLast();
+      recordCount += batch.getSv2().getCount();
+      batchGroupList.add(batch);
     }
     if (batchGroupList.size() == 0) {
       return;
     }
     constructHyperBatch(batchGroupList, hyperBatch);
-    createCopier(hyperBatch, batchGroupList, outputContainer);
+    createCopier(hyperBatch, batchGroupList, outputContainer, recordCount);
 
     int count = copier.next();
     assert count > 0;
@@ -535,7 +538,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
     }
   }
 
-  private void createCopier(VectorAccessible batch, List<BatchGroup> 
batchGroupList, VectorContainer outputContainer) throws SchemaChangeException {
+  private void createCopier(VectorAccessible batch, List<BatchGroup> 
batchGroupList, VectorContainer outputContainer, int recordCount) throws 
SchemaChangeException {
     try {
       if (copier == null) {
         CodeGenerator<PriorityQueueCopier> cg = 
CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
@@ -555,7 +558,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
         outputContainer.add(v);
         allocators.add(VectorAllocator.getAllocator(v, 110));
       }
-      copier.setup(context, copierAllocator, batch, batchGroupList, 
outputContainer, allocators);
+      copier.setup(context, copierAllocator, batch, batchGroupList, 
outputContainer, allocators, recordCount);
     } catch (ClassTransformationException e) {
       throw new RuntimeException(e);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index 7122963..f2da717 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import java.util.List;
 
 public interface PriorityQueueCopier {
-  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible 
outgoing, List<VectorAllocator> allocators) throws SchemaChangeException;
+  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible 
outgoing, List<VectorAllocator> allocators, int recordCnt) throws 
SchemaChangeException;
   public int next();
   public List<VectorAllocator> getAllocators();
   public void cleanup();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index a74f14c..6e9c355 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -44,7 +44,7 @@ public abstract class PriorityQueueCopierTemplate implements 
PriorityQueueCopier
   private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT;
 
   @Override
-  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible 
outgoing, List<VectorAllocator> allocators) throws SchemaChangeException {
+  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible 
outgoing, List<VectorAllocator> allocators, int recordCnt) throws 
SchemaChangeException {
     this.context = context;
     this.allocator = allocator;
     this.hyperBatch = hyperBatch;
@@ -64,6 +64,11 @@ public abstract class PriorityQueueCopierTemplate implements 
PriorityQueueCopier
       siftUp();
       queueSize++;
     }
+
+    // Check if the we have enough records to create BatchData with two 
containers.
+    if (recordCnt < (2 * targetRecordCount)) {
+      targetRecordCount = (recordCnt / 2);
+    }
   }
 
   @Override

Reply via email to