Repository: drill
Updated Branches:
  refs/heads/master 030189f90 -> a03f5429e


DRILL-5839: Handle Empty Batches in Merge Receiver

closes #974


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a03f5429
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a03f5429
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a03f5429

Branch: refs/heads/master
Commit: a03f5429e368cf73286eec6101871f6e61a5b7d1
Parents: 030189f
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Authored: Sat Sep 30 17:33:17 2017 -0700
Committer: Paul Rogers <prog...@maprtech.com>
Committed: Thu Oct 5 23:25:40 2017 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java  | 102 +++++++++++++++++--
 .../drill/exec/store/mock/MockRecordReader.java |  12 +++
 .../impl/mergereceiver/TestMergingReceiver.java |  79 ++++++++++++++
 .../mergerecv/empty_batch_noschema.json         |  51 ++++++++++
 .../multiple_providers_empty_batches.json       |  67 ++++++++++++
 5 files changed, 300 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index ff3ac91..35c1f81 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.ArrayList;
 
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -45,6 +46,7 @@ import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.BitData;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
@@ -188,6 +190,9 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       return IterOutcome.NONE;
     }
 
+    List<UserBitShared.SerializedField> fieldList = null;
+    boolean createDummyBatch = false;
+
     // lazy initialization
     if (!hasRun) {
       schemaChanged = true; // first iteration is always a schema change
@@ -214,14 +219,23 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
           return IterOutcome.STOP;
         }
 
-        assert rawBatch != null : "rawBatch is null although 
context.shouldContinue() == true";
+        // If rawBatch is null, go ahead and add it to the list. We will 
create dummy batches
+        // for all null batches later.
+        if (rawBatch == null) {
+          createDummyBatch = true;
+          rawBatches.add(rawBatch);
+          continue;
+        }
+
+        if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() 
!= 0) {
+          // save the schema to fix up empty batches with no schema if needed.
+            fieldList = rawBatch.getHeader().getDef().getFieldList();
+        }
+
         if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
           rawBatches.add(rawBatch);
         } else {
-          // save an empty batch to use for schema purposes. ignore batch if 
it contains no fields, and thus no schema
-          if (emptyBatch == null && 
rawBatch.getHeader().getDef().getFieldCount() != 0) {
-            emptyBatch = rawBatch;
-          }
+          // keep reading till we get a batch with record count > 0 or we have 
no more batches to read i.e. we get null
           try {
             while ((rawBatch = getNext(p)) != null && 
rawBatch.getHeader().getDef().getRecordCount() == 0) {
               // Do nothing
@@ -235,15 +249,48 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
             clearBatches(rawBatches);
             return IterOutcome.STOP;
           }
-          if (rawBatch != null) {
-            rawBatches.add(rawBatch);
-          } else {
-            rawBatches.add(emptyBatch);
+          if (rawBatch == null || 
rawBatch.getHeader().getDef().getFieldCount() == 0) {
+            createDummyBatch = true;
           }
+          // Even if rawBatch is null, go ahead and add it to the list.
+          // We will create dummy batches for all null batches later.
+          rawBatches.add(rawBatch);
         }
         p++;
       }
 
+      // If no batch arrived with schema from any of the providers, just 
return NONE.
+      if (fieldList == null) {
+        return IterOutcome.NONE;
+      }
+
+      // Go through and fix schema for empty batches.
+      if (createDummyBatch) {
+        // Create dummy record batch definition with 0 record count
+        UserBitShared.RecordBatchDef dummyDef = 
UserBitShared.RecordBatchDef.newBuilder()
+            // we cannot use/modify the original field list as that is used by
+            // valid record batch.
+            // create a copy of field list with valuecount = 0 for all fields.
+            // This is for dummy schema generation.
+            .addAllField(createDummyFieldList(fieldList))
+            .setRecordCount(0)
+            .build();
+
+        // Create dummy header
+        BitData.FragmentRecordBatch dummyHeader = 
BitData.FragmentRecordBatch.newBuilder()
+            .setIsLastBatch(true)
+            .setDef(dummyDef)
+            .build();
+
+        for (int i = 0; i < p; i++) {
+          RawFragmentBatch rawBatch = rawBatches.get(i);
+          if (rawBatch == null || 
rawBatch.getHeader().getDef().getFieldCount() == 0) {
+            rawBatch = new RawFragmentBatch(dummyHeader, null, null);
+            rawBatches.set(i, rawBatch);
+          }
+        }
+      }
+
       // allocate the incoming record batch loaders
       senderCount = rawBatches.size();
       incomingBatches = new RawFragmentBatch[senderCount];
@@ -435,6 +482,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       if (prevBatchWasFull) {
         break;
       }
+
     }
 
     // set the value counts in the outgoing vectors
@@ -454,6 +502,39 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     }
   }
 
+  // Create dummy field that will be used for empty batches.
+  private UserBitShared.SerializedField 
createDummyField(UserBitShared.SerializedField field) {
+    UserBitShared.SerializedField.Builder newDummyFieldBuilder = 
UserBitShared.SerializedField.newBuilder()
+        .setVarByteLength(0)
+        .setBufferLength(0)
+        .setValueCount(0)
+        .setNamePart(field.getNamePart())
+        .setMajorType(field.getMajorType());
+
+    int index = 0;
+    for (UserBitShared.SerializedField childField : field.getChildList()) {
+      // make sure we make a copy of all children, so we do not corrupt the
+      // original fieldList. This will recursively call itself.
+      newDummyFieldBuilder.addChild(index, createDummyField(childField));
+      index++;
+    }
+
+    UserBitShared.SerializedField newDummyField = newDummyFieldBuilder.build();
+
+    return newDummyField;
+  }
+
+  // Create a dummy field list that we can use for empty batches.
+  private List<UserBitShared.SerializedField> 
createDummyFieldList(List<UserBitShared.SerializedField> fieldList) {
+    List<UserBitShared.SerializedField> dummyFieldList = new 
ArrayList<UserBitShared.SerializedField>();
+
+    for (UserBitShared.SerializedField field : fieldList) {
+      dummyFieldList.add(createDummyField(field));
+    }
+
+    return dummyFieldList;
+  }
+
   @Override
   public FragmentContext getContext() {
     return context;
@@ -482,7 +563,6 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
           } else {
             state = BatchState.DONE;
           }
-
           break;
         }
         if (batch.getHeader().getDef().getFieldCount() == 0) {
@@ -640,7 +720,7 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       final CodeGenerator<MergingReceiverGeneratorBase> cg = 
CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, 
context.getFunctionRegistry(), context.getOptions());
       cg.plainJavaCapable(true);
       // Uncomment out this line to debug the generated code.
-//      cg.saveCodeForDebugging(true);
+      // cg.saveCodeForDebugging(true);
       final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
 
       ExpandableHyperContainer batch = null;

http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 2d9973e..108a621 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -51,6 +51,10 @@ public class MockRecordReader extends AbstractRecordReader {
   }
 
   private int getEstimatedRecordSize(MockColumn[] types) {
+    if (types == null) {
+      return 0;
+    }
+
     int x = 0;
     for (int i = 0; i < types.length; i++) {
       x += TypeHelper.getSize(types[i].getMajorType());
@@ -68,6 +72,9 @@ public class MockRecordReader extends AbstractRecordReader {
   public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
     try {
       final int estimateRowSize = getEstimatedRecordSize(config.getTypes());
+      if (config.getTypes() == null) {
+        return;
+      }
       valueVectors = new ValueVector[config.getTypes().length];
       batchRecordCount = 250000 / estimateRowSize;
 
@@ -90,6 +97,11 @@ public class MockRecordReader extends AbstractRecordReader {
 
     final int recordSetSize = Math.min(batchRecordCount, 
this.config.getRecords() - recordsRead);
     recordsRead += recordSetSize;
+
+    if (valueVectors == null) {
+      return recordSetSize;
+    }
+
     for (final ValueVector v : valueVectors) {
       final ValueVector.Mutator m = v.getMutator();
       m.generateTestData(recordSetSize);

http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
index cb2995c..f0aa1b7 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
@@ -152,4 +152,83 @@ public class TestMergingReceiver extends PopUnitTestBase {
       assertEquals(100000, count);
     }
   }
+
+  @Test
+  public void handleEmptyBatchNoSchema() throws Exception {
+    @SuppressWarnings("resource")
+    final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+         final Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+         final DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator());) {
+
+      bit1.run();
+      bit2.run();
+      client.connect();
+      final List<QueryDataBatch> results =
+          
client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
+              
Files.toString(FileUtils.getResourceAsFile("/mergerecv/empty_batch_noschema.json"),
+                  Charsets.UTF_8));
+      int count = 0;
+      final RecordBatchLoader batchLoader = new 
RecordBatchLoader(client.getAllocator());
+      // print the results
+      for (final QueryDataBatch b : results) {
+        final QueryData queryData = b.getHeader();
+        batchLoader.load(queryData.getDef(), b.getData()); // loaded but not 
used, for testing
+        count += queryData.getRowCount();
+        b.release();
+        batchLoader.clear();
+      }
+      assertEquals(100000, count);
+    }
+  }
+
+  @Test
+  public void testMultipleProvidersEmptyBatches() throws Exception {
+    @SuppressWarnings("resource")
+    final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+         final Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+         final DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator())) {
+
+      bit1.run();
+      bit2.run();
+      client.connect();
+      final List<QueryDataBatch> results =
+          
client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
+              
Files.toString(FileUtils.getResourceAsFile("/mergerecv/multiple_providers_empty_batches.json"),
+                  Charsets.UTF_8));
+      int count = 0;
+      final RecordBatchLoader batchLoader = new 
RecordBatchLoader(client.getAllocator());
+      // print the results
+      Long lastBlueValue = null;
+      for (final QueryDataBatch b : results) {
+        final QueryData queryData = b.getHeader();
+        final int batchRowCount = queryData.getRowCount();
+        count += batchRowCount;
+        batchLoader.load(queryData.getDef(), b.getData());
+        for (final VectorWrapper<?> vw : batchLoader) {
+          @SuppressWarnings("resource")
+          final ValueVector vv = vw.getValueVector();
+          final ValueVector.Accessor va = vv.getAccessor();
+          final MaterializedField materializedField = vv.getField();
+          final int numValues = va.getValueCount();
+          for (int valueIdx = 0; valueIdx < numValues; ++valueIdx) {
+            if (materializedField.getName().equals("blue")) {
+              final long longValue = (Long) va.getObject(valueIdx);
+              // check that order is ascending
+              if (lastBlueValue != null) {
+                assertTrue(longValue >= lastBlueValue);
+              }
+              lastBlueValue = longValue;
+            }
+          }
+        }
+        b.release();
+        batchLoader.clear();
+      }
+      assertEquals(300000, count);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/test/resources/mergerecv/empty_batch_noschema.json
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/mergerecv/empty_batch_noschema.json 
b/exec/java-exec/src/test/resources/mergerecv/empty_batch_noschema.json
new file mode 100644
index 0000000..6bf7b24
--- /dev/null
+++ b/exec/java-exec/src/test/resources/mergerecv/empty_batch_noschema.json
@@ -0,0 +1,51 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+        type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"mock-scan",
+      url: "http://apache.org";,
+      entries:[
+        {records: 100000, types: [
+          {name: "blue", type: "BIGINT", mode: "OPTIONAL"},
+          {name: "red", type: "BIGINT", mode: "OPTIONAL"},
+          {name: "green", type: "BIGINT", mode: "OPTIONAL"}
+        ]},
+        {records: 0},
+        {records: 0, types: [
+          {name: "blue", type: "BIGINT", mode: "OPTIONAL"},
+          {name: "red", type: "BIGINT", mode: "OPTIONAL"},
+          {name: "green", type: "BIGINT", mode: "OPTIONAL"}
+        ]}
+      ]
+    },
+    {
+      @id: 2,
+      child: 1,
+      pop: "external-sort",
+      orderings: [ {expr: "blue", order:"DESC"},  {expr: "red", order:"DESC"} ]
+    },
+    {
+      @id: 3,
+      child: 2,
+      pop:"selection-vector-remover"
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "single-merge-exchange",
+      orderings: [ {expr: "blue", order:"DESC"},  {expr: "red", order:"DESC"}  
]
+    },
+    {
+      @id: 5,
+      child: 4,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/test/resources/mergerecv/multiple_providers_empty_batches.json
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/mergerecv/multiple_providers_empty_batches.json
 
b/exec/java-exec/src/test/resources/mergerecv/multiple_providers_empty_batches.json
new file mode 100644
index 0000000..5d2feb8
--- /dev/null
+++ 
b/exec/java-exec/src/test/resources/mergerecv/multiple_providers_empty_batches.json
@@ -0,0 +1,67 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+        type:"manual"
+    }
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"mock-scan",
+      url: "http://apache.org";,
+
+      entries:[
+        {records: 100000, types: [
+          {name: "blue", type: "BIGINT", mode: "REQUIRED", repeat: 2},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED",
+            properties: { a: 10, b: "foo" }}
+        ]},
+        {records: 100000, types: [
+          {name: "blue", type: "BIGINT", mode: "REQUIRED", repeat: 2},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED",
+            properties: { a: 10, b: "foo" }}
+        ]},
+        {records: 0},
+        {records: 0, types: [
+          {name: "blue", type: "BIGINT", mode: "REQUIRED", repeat: 2},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED",
+            properties: { a: 10, b: "foo" }}
+        ]},
+        {records: 100000, types: [
+          {name: "blue", type: "BIGINT", mode: "REQUIRED", repeat: 2},
+          {name: "red", type: "INT", mode: "REQUIRED"},
+          {name: "green", type: "INT", mode: "REQUIRED",
+            properties: { a: 10, b: "foo" }}
+        ]},
+        {records: 0}
+      ]
+    },
+    {
+      @id: 2,
+      child: 1,
+      pop: "external-sort",
+      orderings: [ {expr: "blue", order:"ASC"},  {expr: "red", order:"ASC"} ]
+    },
+    {
+      @id: 3,
+      child: 2,
+      pop:"selection-vector-remover"
+    },
+    {
+      @id: 4,
+      child: 3,
+      pop: "single-merge-exchange",
+      orderings: [ {expr: "blue", order:"ASC"},  {expr: "red", order:"ASC"}  ]
+    },
+    {
+      @id: 5,
+      child: 4,
+      pop: "screen"
+    }
+  ]
+}
\ No newline at end of file

Reply via email to