Repository: drill Updated Branches: refs/heads/master 1f23b8962 -> b979bebe8
DRILL-4476: Allow UnionAllRecordBatch to manager situations where left input side or both sides come(s) from empty source(s). close apache/drill#407 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b979bebe Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b979bebe Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b979bebe Branch: refs/heads/master Commit: b979bebe83d7017880b0763adcbf8eb80acfcee8 Parents: 1f23b89 Author: Hsuan-Yi Chu <hsua...@usc.edu> Authored: Fri Mar 4 13:50:02 2016 -0800 Committer: Aman Sinha <asi...@maprtech.com> Committed: Sat Mar 12 20:38:09 2016 -0800 ---------------------------------------------------------------------- .../impl/union/UnionAllRecordBatch.java | 174 ++++++++++++++++--- .../java/org/apache/drill/TestUnionAll.java | 126 +++++++++++++- .../org/apache/drill/TestUnionDistinct.java | 116 +++++++++++++ 3 files changed, 389 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/b979bebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index a8b8aeb..57e80d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -162,6 +162,25 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { allocationVectors = Lists.newArrayList(); transfers.clear(); + // If both sides of Union-All are empty + if(unionAllInput.isBothSideEmpty()) { + for(int i = 0; i < outputFields.size(); ++i) { + final String colName = outputFields.get(i).getPath(); + final MajorType majorType = MajorType.newBuilder() + .setMinorType(MinorType.INT) + .setMode(DataMode.OPTIONAL) + .build(); + + MaterializedField outputField = MaterializedField.create(colName, majorType); + ValueVector vv = container.addOrGet(outputField, callBack); + allocationVectors.add(vv); + } + + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + return IterOutcome.OK_NEW_SCHEMA; + } + + final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry()); int index = 0; for(VectorWrapper<?> vw : current) { @@ -287,6 +306,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { // They are used to check if the schema is changed between recordbatches private BatchSchema leftSchema; private BatchSchema rightSchema; + private boolean bothEmpty = false; public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch right) { this.unionAllRecordBatch = unionAllRecordBatch; @@ -294,13 +314,52 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { rightSide = new OneSideInput(right); } + private void setBothSideEmpty(boolean bothEmpty) { + this.bothEmpty = bothEmpty; + } + + private boolean isBothSideEmpty() { + return bothEmpty; + } + public IterOutcome nextBatch() throws SchemaChangeException { if(upstream == RecordBatch.IterOutcome.NOT_YET) { IterOutcome iterLeft = leftSide.nextBatch(); switch(iterLeft) { case OK_NEW_SCHEMA: - break; + /* + * If the first few record batches are all empty, + * there is no way to tell whether these empty batches are coming from empty files. + * It is incorrect to infer output types when either side could be coming from empty. + * + * Thus, while-loop is necessary to skip those empty batches. + */ + whileLoop: + while(leftSide.getRecordBatch().getRecordCount() == 0) { + iterLeft = leftSide.nextBatch(); + + switch(iterLeft) { + case STOP: + case OUT_OF_MEMORY: + return iterLeft; + + case NONE: + // Special Case: The left side was an empty input. + leftIsFinish = true; + break whileLoop; + + case NOT_YET: + case OK_NEW_SCHEMA: + case OK: + continue whileLoop; + + default: + throw new IllegalStateException( + String.format("Unexpected state %s.", iterLeft)); + } + } + break; case STOP: case OUT_OF_MEMORY: return iterLeft; @@ -315,26 +374,52 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { case OK_NEW_SCHEMA: // Unless there is no record batch on the left side of the inputs, // always start processing from the left side. - unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch()); - + if(leftIsFinish) { + unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch()); + } else { + unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch()); + } // If the record count of the first batch from right input is zero, // there are two possibilities: // 1. The right side is an empty input (e.g., file). // 2. There will be more records carried by later batches. - if (rightSide.getRecordBatch().getRecordCount() == 0) { - iterRight = rightSide.nextBatch(); - if (iterRight == IterOutcome.NONE) { - // Case 1: The right side was an empty input. - inferOutputFieldsFromLeftSide(); - rightIsFinish = true; - } else { - // Case 2: There are more records carried by the latter batches. - inferOutputFields(); + /* + * If the first few record batches are all empty, + * there is no way to tell whether these empty batches are coming from empty files. + * It is incorrect to infer output types when either side could be coming from empty. + * + * Thus, while-loop is necessary to skip those empty batches. + */ + whileLoop: + while(rightSide.getRecordBatch().getRecordCount() == 0) { + iterRight = rightSide.nextBatch(); + switch(iterRight) { + case STOP: + case OUT_OF_MEMORY: + return iterRight; + + case NONE: + // Special Case: The right side was an empty input. + rightIsFinish = true; + break whileLoop; + + case NOT_YET: + case OK_NEW_SCHEMA: + case OK: + continue whileLoop; + + default: + throw new IllegalStateException( + String.format("Unexpected state %s.", iterRight)); } - } else { - inferOutputFields(); } + + if(leftIsFinish && rightIsFinish) { + setBothSideEmpty(true); + } + + inferOutputFields(); break; case STOP: @@ -346,9 +431,15 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { String.format("Unexpected state %s.", iterRight)); } + + upstream = IterOutcome.OK_NEW_SCHEMA; return upstream; } else { + if(isBothSideEmpty()) { + return IterOutcome.NONE; + } + unionAllRecordBatch.clearCurrentRecordBatch(); if(leftIsFinish && rightIsFinish) { @@ -431,9 +522,45 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { } } + /** + * + * Summarize the inference in the four different situations: + * First of all, the field names are always determined by the left side + * (Even when the left side is from an empty file, we have the column names.) + * + * Cases: + * 1. Left: non-empty; Right: non-empty + * types determined by both sides with implicit casting involved + * 2. Left: empty; Right: non-empty + * type from the right + * 3. Left: non-empty; Right: empty + * types from the left + * 4. Left: empty; Right: empty + * types are nullable integer + */ + private void inferOutputFields() { + if(!leftIsFinish && !rightIsFinish) { + // Both sides are non-empty + inferOutputFieldsBothSide(); + } else if(!rightIsFinish) { + // Left side is non-empty + // While use left side's column names as output column names, + // use right side's column types as output column types. + inferOutputFieldsFromSingleSide( + leftSide.getRecordBatch().getSchema(), + rightSide.getRecordBatch().getSchema()); + } else { + // Either right side is empty or both are empty + // Using left side's schema is sufficient + inferOutputFieldsFromSingleSide( + leftSide.getRecordBatch().getSchema(), + leftSide.getRecordBatch().getSchema()); + } + } + // The output table's column names always follow the left table, // where the output type is chosen based on DRILL's implicit casting rules - private void inferOutputFields() { + private void inferOutputFieldsBothSide() { outputFields = Lists.newArrayList(); leftSchema = leftSide.getRecordBatch().getSchema(); rightSchema = rightSide.getRecordBatch().getSchema(); @@ -482,12 +609,19 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning"; } - private void inferOutputFieldsFromLeftSide() { + private void inferOutputFieldsFromSingleSide(final BatchSchema schemaForNames, final BatchSchema schemaForTypes) { outputFields = Lists.newArrayList(); - Iterator<MaterializedField> iter = leftSide.getRecordBatch().getSchema().iterator(); - while(iter.hasNext()) { - MaterializedField field = iter.next(); - outputFields.add(MaterializedField.create(field.getPath(), field.getType())); + + final List<String> outputColumnNames = Lists.newArrayList(); + final Iterator<MaterializedField> iterForNames = schemaForNames.iterator(); + while(iterForNames.hasNext()) { + outputColumnNames.add(iterForNames.next().getPath()); + } + + final Iterator<MaterializedField> iterForTypes = schemaForTypes.iterator(); + for(int i = 0; iterForTypes.hasNext(); ++i) { + MaterializedField field = iterForTypes.next(); + outputFields.add(MaterializedField.create(outputColumnNames.get(i), field.getType())); } } http://git-wip-us.apache.org/repos/asf/drill/blob/b979bebe/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java index 8e6d846..7092a4f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java @@ -17,14 +17,18 @@ */ package org.apache.drill; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.work.foreman.SqlUnsupportedException; import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException; -import org.junit.Ignore; import org.junit.Test; +import java.util.List; + public class TestUnionAll extends BaseTestQuery{ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class); @@ -527,6 +531,117 @@ public class TestUnionAll extends BaseTestQuery{ .build().run(); } + @Test + public void testUnionAllLeftEmptyJson() throws Exception { + final String rootEmpty = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString(); + final String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString(); + + final String queryLeftEmpty = String.format( + "select key from dfs_test.`%s` " + + "union all " + + "select key from dfs_test.`%s`", + rootEmpty, + rootSimple); + + testBuilder() + .sqlQuery(queryLeftEmpty) + .unOrdered() + .baselineColumns("key") + .baselineValues(true) + .baselineValues(false) + .build() + .run(); + } + + @Test + public void testUnionAllBothEmptyJson() throws Exception { + final String rootEmpty = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString(); + final String query = String.format( + "select key from dfs_test.`%s` " + + "union all " + + "select key from dfs_test.`%s`", + rootEmpty, + rootEmpty); + + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList(); + final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder() + .setMinorType(TypeProtos.MinorType.INT) + .setMode(TypeProtos.DataMode.OPTIONAL) + .build(); + expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType)); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .build() + .run(); + } + + @Test + public void testUnionAllRightEmptyBatch() throws Exception { + String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString(); + + String queryRightEmptyBatch = String.format( + "select key from dfs_test.`%s` " + + "union all " + + "select key from dfs_test.`%s` where 1 = 0", + rootSimple, + rootSimple); + + testBuilder() + .sqlQuery(queryRightEmptyBatch) + .unOrdered() + .baselineColumns("key") + .baselineValues(true) + .baselineValues(false) + .build().run(); + } + + @Test + public void testUnionAllLeftEmptyBatch() throws Exception { + String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString(); + + final String queryLeftBatch = String.format( + "select key from dfs_test.`%s` where 1 = 0 " + + "union all " + + "select key from dfs_test.`%s`", + rootSimple, + rootSimple); + + testBuilder() + .sqlQuery(queryLeftBatch) + .unOrdered() + .baselineColumns("key") + .baselineValues(true) + .baselineValues(false) + .build() + .run(); + } + + @Test + public void testUnionAllBothEmptyBatch() throws Exception { + String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString(); + final String query = String.format( + "select key from dfs_test.`%s` where 1 = 0 " + + "union all " + + "select key from dfs_test.`%s` where 1 = 0", + rootSimple, + rootSimple); + + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList(); + final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder() + .setMinorType(TypeProtos.MinorType.INT) + .setMode(TypeProtos.DataMode.OPTIONAL) + .build(); + expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType)); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .build() + .run(); + } + @Test // see DRILL-2746 public void testFilterPushDownOverUnionAll() throws Exception { String query = "select n_regionkey from \n" @@ -558,8 +673,7 @@ public class TestUnionAll extends BaseTestQuery{ } @Test // see DRILL-2746 - @Ignore("DRILL-4472") - public void testInListPushDownOverUnionAll() throws Exception { + public void testInListOnUnionAll() throws Exception { String query = "select n_nationkey \n" + "from (select n1.n_nationkey from cp.`tpch/nation.parquet` n1 inner join cp.`tpch/region.parquet` r1 on n1.n_regionkey = r1.r_regionkey \n" + "union all \n" + @@ -571,13 +685,11 @@ public class TestUnionAll extends BaseTestQuery{ ".*UnionAll.*\n" + ".*Project.*\n" + ".*HashJoin.*\n" + - ".*Project.*\n" + - ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" + + ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" + ".*Scan.*columns=\\[`r_regionkey`\\].*\n" + ".*Project.*\n" + ".*HashJoin.*\n" + - ".*Project.*\n" + - ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" + + ".*Scan.*columns=\\[`n_regionkey`, `n_nationkey`\\].*\n" + ".*Scan.*columns=\\[`r_regionkey`\\].*"}; final String[] excludedPlan = {}; PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan); http://git-wip-us.apache.org/repos/asf/drill/blob/b979bebe/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java index add9787..a615136 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionDistinct.java @@ -17,13 +17,18 @@ */ package org.apache.drill; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.work.foreman.SqlUnsupportedException; import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException; import org.junit.Test; +import java.util.List; + public class TestUnionDistinct extends BaseTestQuery { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionDistinct.class); @@ -528,6 +533,117 @@ public class TestUnionDistinct extends BaseTestQuery { } @Test + public void testUnionDistinctLeftEmptyJson() throws Exception { + final String rootEmpty = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString(); + final String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString(); + + final String queryLeftEmpty = String.format( + "select key from dfs_test.`%s` " + + "union " + + "select key from dfs_test.`%s`", + rootEmpty, + rootSimple); + + testBuilder() + .sqlQuery(queryLeftEmpty) + .unOrdered() + .baselineColumns("key") + .baselineValues(true) + .baselineValues(false) + .build() + .run(); + } + + @Test + public void testUnionDistinctBothEmptyJson() throws Exception { + final String rootEmpty = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString(); + final String query = String.format( + "select key from dfs_test.`%s` " + + "union " + + "select key from dfs_test.`%s`", + rootEmpty, + rootEmpty); + + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList(); + final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder() + .setMinorType(TypeProtos.MinorType.INT) + .setMode(TypeProtos.DataMode.OPTIONAL) + .build(); + expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType)); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .build() + .run(); + } + + @Test + public void testUnionDistinctRightEmptyBatch() throws Exception { + String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString(); + + String queryRightEmptyBatch = String.format( + "select key from dfs_test.`%s` " + + "union " + + "select key from dfs_test.`%s` where 1 = 0", + rootSimple, + rootSimple); + + testBuilder() + .sqlQuery(queryRightEmptyBatch) + .unOrdered() + .baselineColumns("key") + .baselineValues(true) + .baselineValues(false) + .build().run(); + } + + @Test + public void testUnionDistinctLeftEmptyBatch() throws Exception { + String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString(); + + final String queryLeftBatch = String.format( + "select key from dfs_test.`%s` where 1 = 0 " + + "union " + + "select key from dfs_test.`%s`", + rootSimple, + rootSimple); + + testBuilder() + .sqlQuery(queryLeftBatch) + .unOrdered() + .baselineColumns("key") + .baselineValues(true) + .baselineValues(false) + .build() + .run(); + } + + @Test + public void testUnionDistinctBothEmptyBatch() throws Exception { + String rootSimple = FileUtils.getResourceAsFile("/store/json/booleanData.json").toURI().toString(); + final String query = String.format( + "select key from dfs_test.`%s` where 1 = 0 " + + "union " + + "select key from dfs_test.`%s` where 1 = 0", + rootSimple, + rootSimple); + + final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList(); + final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder() + .setMinorType(TypeProtos.MinorType.INT) + .setMode(TypeProtos.DataMode.OPTIONAL) + .build(); + expectedSchema.add(Pair.of(SchemaPath.getSimplePath("key"), majorType)); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .build() + .run(); + } + + @Test public void testFilterPushDownOverUnionDistinct() throws Exception { String query = "select n_regionkey from \n" + "(select n_regionkey from cp.`tpch/nation.parquet` union select r_regionkey from cp.`tpch/region.parquet`) \n"