DRILL-1107: Fix regression from first patch

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/810a2040
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/810a2040
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/810a2040

Branch: refs/heads/master
Commit: 810a20409089a10ae984807ff4418bfe7ee18957
Parents: 0b905fe
Author: Steven Phillips <[email protected]>
Authored: Mon Jul 7 14:26:00 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Mon Jul 7 15:53:33 2014 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java  | 28 +++++++++++++++++---
 1 file changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/810a2040/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index e83f461..914a187 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -187,6 +187,8 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
           }
           if (rawBatch != null) {
             rawBatches.add(rawBatch);
+          } else {
+            rawBatches.add(emptyBatch);
           }
         }
       }
@@ -288,8 +290,26 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       });
 
       // populate the priority queue with initial values
-      for (int b = 0; b < senderCount; ++b)
-        pqueue.add(new Node(b, 0));
+      for (int b = 0; b < senderCount; ++b) {
+        while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 
0) {
+          try {
+            RawFragmentBatch batch = getNext(fragProviders[b]);
+            incomingBatches[b] = batch;
+            if (batch != null) {
+              batchLoaders[b].load(batch.getHeader().getDef(), 
batch.getBody());
+            } else {
+              batchLoaders[b].clear();
+              batchLoaders[b] = null;
+            }
+          } catch (IOException | SchemaChangeException e) {
+            context.fail(e);
+            return IterOutcome.STOP;
+          }
+        }
+        if (batchLoaders[b] != null) {
+          pqueue.add(new Node(b, 0));
+        }
+      }
 
       hasRun = true;
       // finished lazy initialization
@@ -566,7 +586,9 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     outgoingContainer.clear();
     if (batchLoaders != null) {
       for(RecordBatchLoader rbl : batchLoaders){
-        rbl.clear();
+        if (rbl != null) {
+          rbl.clear();
+        }
       }
     }
     oContext.close();

Reply via email to