Repository: drill
Updated Branches:
  refs/heads/master 1f23b8962 -> b979bebe8


DRILL-4476: Allow UnionAllRecordBatch to manager situations where left input 
side or both sides come(s) from empty source(s).

close apache/drill#407


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b979bebe
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b979bebe
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b979bebe

Branch: refs/heads/master
Commit: b979bebe83d7017880b0763adcbf8eb80acfcee8
Parents: 1f23b89
Author: Hsuan-Yi Chu <hsua...@usc.edu>
Authored: Fri Mar 4 13:50:02 2016 -0800
Committer: Aman Sinha <asi...@maprtech.com>
Committed: Sat Mar 12 20:38:09 2016 -0800

----------------------------------------------------------------------
 .../impl/union/UnionAllRecordBatch.java         | 174 ++++++++++++++++---
 .../java/org/apache/drill/TestUnionAll.java     | 126 +++++++++++++-
 .../org/apache/drill/TestUnionDistinct.java     | 116 +++++++++++++
 3 files changed, 389 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b979bebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index a8b8aeb..57e80d7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -162,6 +162,25 @@ public class UnionAllRecordBatch extends 
AbstractRecordBatch<UnionAll> {
     allocationVectors = Lists.newArrayList();
     transfers.clear();
 
+    // If both sides of Union-All are empty
+    if(unionAllInput.isBothSideEmpty()) {
+      for(int i = 0; i < outputFields.size(); ++i) {
+        final String colName = outputFields.get(i).getPath();
+        final MajorType majorType = MajorType.newBuilder()
+            .setMinorType(MinorType.INT)
+            .setMode(DataMode.OPTIONAL)
+            .build();
+
+        MaterializedField outputField = MaterializedField.create(colName, 
majorType);
+        ValueVector vv = container.addOrGet(outputField, callBack);
+        allocationVectors.add(vv);
+      }
+
+      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      return IterOutcome.OK_NEW_SCHEMA;
+    }
+
+
     final ClassGenerator<UnionAller> cg = 
CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
     int index = 0;
     for(VectorWrapper<?> vw : current) {
@@ -287,6 +306,7 @@ public class UnionAllRecordBatch extends 
AbstractRecordBatch<UnionAll> {
     // They are used to check if the schema is changed between recordbatches
     private BatchSchema leftSchema;
     private BatchSchema rightSchema;
+    private boolean bothEmpty = false;
 
     public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch 
left, RecordBatch right) {
       this.unionAllRecordBatch = unionAllRecordBatch;
@@ -294,13 +314,52 @@ public class UnionAllRecordBatch extends 
AbstractRecordBatch<UnionAll> {
       rightSide = new OneSideInput(right);
     }
 
+    private void setBothSideEmpty(boolean bothEmpty) {
+      this.bothEmpty = bothEmpty;
+    }
+
+    private boolean isBothSideEmpty() {
+      return bothEmpty;
+    }
+
     public IterOutcome nextBatch() throws SchemaChangeException {
       if(upstream == RecordBatch.IterOutcome.NOT_YET) {
         IterOutcome iterLeft = leftSide.nextBatch();
         switch(iterLeft) {
           case OK_NEW_SCHEMA:
-            break;
+            /*
+             * If the first few record batches are all empty,
+             * there is no way to tell whether these empty batches are coming 
from empty files.
+             * It is incorrect to infer output types when either side could be 
coming from empty.
+             *
+             * Thus, while-loop is necessary to skip those empty batches.
+             */
+            whileLoop:
+            while(leftSide.getRecordBatch().getRecordCount() == 0) {
+              iterLeft = leftSide.nextBatch();
+
+              switch(iterLeft) {
+                case STOP:
+                case OUT_OF_MEMORY:
+                  return iterLeft;
+
+                case NONE:
+                  // Special Case: The left side was an empty input.
+                  leftIsFinish = true;
+                  break whileLoop;
+
+                case NOT_YET:
+                case OK_NEW_SCHEMA:
+                case OK:
+                  continue whileLoop;
+
+                default:
+                  throw new IllegalStateException(
+                      String.format("Unexpected state %s.", iterLeft));
+              }
+            }
 
+            break;
           case STOP:
           case OUT_OF_MEMORY:
             return iterLeft;
@@ -315,26 +374,52 @@ public class UnionAllRecordBatch extends 
AbstractRecordBatch<UnionAll> {
           case OK_NEW_SCHEMA:
             // Unless there is no record batch on the left side of the inputs,
             // always start processing from the left side.
-            
unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-
+            if(leftIsFinish) {
+              
unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
+            } else {
+              
unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
+            }
             // If the record count of the first batch from right input is zero,
             // there are two possibilities:
             // 1. The right side is an empty input (e.g., file).
             // 2. There will be more records carried by later batches.
-            if (rightSide.getRecordBatch().getRecordCount() == 0) {
-              iterRight = rightSide.nextBatch();
 
-              if (iterRight == IterOutcome.NONE) {
-                // Case 1: The right side was an empty input.
-                inferOutputFieldsFromLeftSide();
-                rightIsFinish = true;
-              } else {
-                // Case 2: There are more records carried by the latter 
batches.
-                inferOutputFields();
+            /*
+             * If the first few record batches are all empty,
+             * there is no way to tell whether these empty batches are coming 
from empty files.
+             * It is incorrect to infer output types when either side could be 
coming from empty.
+             *
+             * Thus, while-loop is necessary to skip those empty batches.
+             */
+            whileLoop:
+            while(rightSide.getRecordBatch().getRecordCount() == 0) {
+              iterRight = rightSide.nextBatch();
+              switch(iterRight) {
+                case STOP:
+                case OUT_OF_MEMORY:
+                  return iterRight;
+
+                case NONE:
+                  // Special Case: The right side was an empty input.
+                  rightIsFinish = true;
+                  break whileLoop;
+
+                case NOT_YET:
+                case OK_NEW_SCHEMA:
+                case OK:
+                  continue whileLoop;
+
+                default:
+                  throw new IllegalStateException(
+                      String.format("Unexpected state %s.", iterRight));
               }
-            } else {
-              inferOutputFields();
             }
+
+            if(leftIsFinish && rightIsFinish) {
+              setBothSideEmpty(true);
+            }
+
+            inferOutputFields();
             break;
 
           case STOP:
@@ -346,9 +431,15 @@ public class UnionAllRecordBatch extends 
AbstractRecordBatch<UnionAll> {
                 String.format("Unexpected state %s.", iterRight));
         }
 
+
+
         upstream = IterOutcome.OK_NEW_SCHEMA;
         return upstream;
       } else {
+        if(isBothSideEmpty()) {
+          return IterOutcome.NONE;
+        }
+
         unionAllRecordBatch.clearCurrentRecordBatch();
 
         if(leftIsFinish && rightIsFinish) {
@@ -431,9 +522,45 @@ public class UnionAllRecordBatch extends 
AbstractRecordBatch<UnionAll> {
       }
     }
 
+    /**
+     *
+     * Summarize the inference in the four different situations:
+     * First of all, the field names are always determined by the left side
+     * (Even when the left side is from an empty file, we have the column 
names.)
+     *
+     * Cases:
+     * 1. Left: non-empty; Right: non-empty
+     *      types determined by both sides with implicit casting involved
+     * 2. Left: empty; Right: non-empty
+     *      type from the right
+     * 3. Left: non-empty; Right: empty
+     *      types from the left
+     * 4. Left: empty; Right: empty
+     *      types are nullable integer
+     */
+    private void inferOutputFields() {
+      if(!leftIsFinish && !rightIsFinish) {
+        // Both sides are non-empty
+        inferOutputFieldsBothSide();
+      } else if(!rightIsFinish) {
+        // Left side is non-empty
+        // While use left side's column names as output column names,
+        // use right side's column types as output column types.
+        inferOutputFieldsFromSingleSide(
+            leftSide.getRecordBatch().getSchema(),
+            rightSide.getRecordBatch().getSchema());
+      } else {
+        // Either right side is empty or both are empty
+        // Using left side's schema is sufficient
+        inferOutputFieldsFromSingleSide(
+            leftSide.getRecordBatch().getSchema(),
+            leftSide.getRecordBatch().getSchema());
+      }
+    }
+
     // The output table's column names always follow the left table,
     // where the output type is chosen based on DRILL's implicit casting rules
-    private void inferOutputFields() {
+    private void inferOutputFieldsBothSide() {
       outputFields = Lists.newArrayList();
       leftSchema = leftSide.getRecordBatch().getSchema();
       rightSchema = rightSide.getRecordBatch().getSchema();
@@ -482,12 +609,19 @@ public class UnionAllRecordBatch extends 
AbstractRecordBatch<UnionAll> {
       assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of 
column count should have been detected when validating sqlNode at planning";
     }
 
-    private void inferOutputFieldsFromLeftSide() {
+    private void inferOutputFieldsFromSingleSide(final BatchSchema 
schemaForNames, final BatchSchema schemaForTypes) {
       outputFields = Lists.newArrayList();
-      Iterator<MaterializedField> iter = 
leftSide.getRecordBatch().getSchema().iterator();
-      while(iter.hasNext()) {
-        MaterializedField field = iter.next();
-        outputFields.add(MaterializedField.create(field.getPath(), 
field.getType()));
+
+      final List<String> outputColumnNames = Lists.newArrayList();
+      final Iterator<MaterializedField> iterForNames = 
schemaForNames.iterator();
+      while(iterForNames.hasNext()) {
+        outputColumnNames.add(iterForNames.next().getPath());
+      }
+
+      final Iterator<MaterializedField> iterForTypes = 
schemaForTypes.iterator();
+      for(int i = 0; iterForTypes.hasNext(); ++i) {
+        MaterializedField field = iterForTypes.next();
+        outputFields.add(MaterializedField.create(outputColumnNames.get(i), 
field.getType()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/b979bebe/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 8e6d846..7092a4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -17,14 +17,18 @@
  */
 package org.apache.drill;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.List;
+
 public class TestUnionAll extends BaseTestQuery{
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
 
@@ -527,6 +531,117 @@ public class TestUnionAll extends BaseTestQuery{
       .build().run();
   }
 
+  @Test
+  public void testUnionAllLeftEmptyJson() throws Exception {
+    final String rootEmpty = 
FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
+    final String rootSimple = 
FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+
+    final String queryLeftEmpty = String.format(
+        "select key from dfs_test.`%s` " +
+        "union all " +
+        "select key from dfs_test.`%s`",
+        rootEmpty,
+        rootSimple);
+
+    testBuilder()
+        .sqlQuery(queryLeftEmpty)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionAllBothEmptyJson() throws Exception {
+    final String rootEmpty = 
FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
+    final String query = String.format(
+        "select key from dfs_test.`%s` " +
+            "union all " +
+            "select key from dfs_test.`%s`",
+        rootEmpty,
+        rootEmpty);
+
+    final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = 
Lists.newArrayList();
+    final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+        .setMinorType(TypeProtos.MinorType.INT)
+        .setMode(TypeProtos.DataMode.OPTIONAL)
+        .build();
+    expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionAllRightEmptyBatch() throws Exception {
+    String rootSimple = 
FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+
+    String queryRightEmptyBatch = String.format(
+        "select key from dfs_test.`%s` " +
+            "union all " +
+            "select key from dfs_test.`%s` where 1 = 0",
+        rootSimple,
+        rootSimple);
+
+    testBuilder()
+        .sqlQuery(queryRightEmptyBatch)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build().run();
+  }
+
+  @Test
+  public void testUnionAllLeftEmptyBatch() throws Exception {
+    String rootSimple = 
FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+
+    final String queryLeftBatch = String.format(
+        "select key from dfs_test.`%s` where 1 = 0 " +
+            "union all " +
+            "select key from dfs_test.`%s`",
+        rootSimple,
+        rootSimple);
+
+    testBuilder()
+        .sqlQuery(queryLeftBatch)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionAllBothEmptyBatch() throws Exception {
+    String rootSimple = 
FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+    final String query = String.format(
+        "select key from dfs_test.`%s` where 1 = 0 " +
+            "union all " +
+            "select key from dfs_test.`%s` where 1 = 0",
+        rootSimple,
+        rootSimple);
+
+    final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = 
Lists.newArrayList();
+    final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+        .setMinorType(TypeProtos.MinorType.INT)
+        .setMode(TypeProtos.DataMode.OPTIONAL)
+        .build();
+    expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
   @Test // see DRILL-2746
   public void testFilterPushDownOverUnionAll() throws Exception {
     String query = "select n_regionkey from \n"
@@ -558,8 +673,7 @@ public class TestUnionAll extends BaseTestQuery{
   }
 
   @Test // see DRILL-2746
-  @Ignore("DRILL-4472")
-  public void testInListPushDownOverUnionAll() throws Exception {
+  public void testInListOnUnionAll() throws Exception {
     String query = "select n_nationkey \n" +
         "from (select n1.n_nationkey from cp.`tpch/nation.parquet` n1 inner 
join cp.`tpch/region.parquet` r1 on n1.n_regionkey = r1.r_regionkey \n" +
         "union all \n" +
@@ -571,13 +685,11 @@ public class TestUnionAll extends BaseTestQuery{
         ".*UnionAll.*\n" +
             ".*Project.*\n" +
                 ".*HashJoin.*\n" +
-                    ".*Project.*\n" +
-                        ".*Scan.*columns=\\[`n_regionkey`, 
`n_nationkey`\\].*\n" +
+                    ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" +
                     ".*Scan.*columns=\\[`r_regionkey`\\].*\n" +
             ".*Project.*\n" +
                 ".*HashJoin.*\n" +
-                    ".*Project.*\n" +
-                        ".*Scan.*columns=\\[`n_regionkey`, 
`n_nationkey`\\].*\n" +
+                    ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" +
                     ".*Scan.*columns=\\[`r_regionkey`\\].*"};
     final String[] excludedPlan = {};
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);

http://git-wip-us.apache.org/repos/asf/drill/blob/b979bebe/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
index add9787..a615136 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java
@@ -17,13 +17,18 @@
  */
 package org.apache.drill;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
 import org.junit.Test;
 
+import java.util.List;
+
 public class TestUnionDistinct extends BaseTestQuery {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestUnionDistinct.class);
 
@@ -528,6 +533,117 @@ public class TestUnionDistinct extends BaseTestQuery {
   }
 
   @Test
+  public void testUnionDistinctLeftEmptyJson() throws Exception {
+    final String rootEmpty = 
FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
+    final String rootSimple = 
FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+
+    final String queryLeftEmpty = String.format(
+        "select key from dfs_test.`%s` " +
+            "union " +
+            "select key from dfs_test.`%s`",
+        rootEmpty,
+        rootSimple);
+
+    testBuilder()
+        .sqlQuery(queryLeftEmpty)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionDistinctBothEmptyJson() throws Exception {
+    final String rootEmpty = 
FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString();
+    final String query = String.format(
+        "select key from dfs_test.`%s` " +
+            "union " +
+            "select key from dfs_test.`%s`",
+        rootEmpty,
+        rootEmpty);
+
+    final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = 
Lists.newArrayList();
+    final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+        .setMinorType(TypeProtos.MinorType.INT)
+        .setMode(TypeProtos.DataMode.OPTIONAL)
+        .build();
+    expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionDistinctRightEmptyBatch() throws Exception {
+    String rootSimple = 
FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+
+    String queryRightEmptyBatch = String.format(
+        "select key from dfs_test.`%s` " +
+            "union " +
+            "select key from dfs_test.`%s` where 1 = 0",
+        rootSimple,
+        rootSimple);
+
+    testBuilder()
+        .sqlQuery(queryRightEmptyBatch)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build().run();
+  }
+
+  @Test
+  public void testUnionDistinctLeftEmptyBatch() throws Exception {
+    String rootSimple = 
FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+
+    final String queryLeftBatch = String.format(
+        "select key from dfs_test.`%s` where 1 = 0 " +
+            "union " +
+            "select key from dfs_test.`%s`",
+        rootSimple,
+        rootSimple);
+
+    testBuilder()
+        .sqlQuery(queryLeftBatch)
+        .unOrdered()
+        .baselineColumns("key")
+        .baselineValues(true)
+        .baselineValues(false)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testUnionDistinctBothEmptyBatch() throws Exception {
+    String rootSimple = 
FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString();
+    final String query = String.format(
+        "select key from dfs_test.`%s` where 1 = 0 " +
+            "union " +
+            "select key from dfs_test.`%s` where 1 = 0",
+        rootSimple,
+        rootSimple);
+
+    final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = 
Lists.newArrayList();
+    final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+        .setMinorType(TypeProtos.MinorType.INT)
+        .setMode(TypeProtos.DataMode.OPTIONAL)
+        .build();
+    expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType));
+
+    testBuilder()
+        .sqlQuery(query)
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
+  @Test
   public void testFilterPushDownOverUnionDistinct() throws Exception {
     String query = "select n_regionkey from \n"
         + "(select n_regionkey from cp.`tpch/nation.parquet` union select 
r_regionkey from cp.`tpch/region.parquet`) \n"

Reply via email to