This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 3af718f050f28210d4c5dcf359c617b2300dd736 Author: hmaduri <hmad...@maprtech.com> AuthorDate: Tue May 15 13:51:45 2018 -0700 DRILL-6431: Unnest operator requires table and a single column alias to be specified. Fixing the issues related to star column renaming, same field name renaming and also enforcing that an alias column is required for the unnest operator. --- .../java/org/apache/drill/exec/ExecConstants.java | 1 + .../physical/impl/unnest/UnnestRecordBatch.java | 16 +- .../drill/exec/planner/physical/CorrelatePrel.java | 45 ++++- .../drill/exec/planner/physical/UnnestPrel.java | 2 +- .../planner/physical/visitor/BasePrelVisitor.java | 12 ++ .../physical/visitor/JoinPrelRenameVisitor.java | 38 +++- .../exec/planner/physical/visitor/PrelVisitor.java | 4 + .../physical/visitor/PrelVisualizerVisitor.java | 15 +- .../physical/visitor/StarColumnConverter.java | 28 ++- .../drill/exec/planner/sql/SqlConverter.java | 7 + .../impl/lateraljoin/TestE2EUnnestAndLateral.java | 48 ++--- .../impl/lateraljoin/TestLateralPlans.java | 212 ++++++++++++++++++--- .../impl/unnest/TestUnnestCorrectness.java | 2 - .../unnest/TestUnnestWithLateralCorrectness.java | 13 +- .../exec/physical/unit/PhysicalOpUnitTestBase.java | 30 +-- .../planner/logical/DrillLogicalTestutils.java | 70 +++++++ .../org/apache/drill/test/DrillTestWrapper.java | 21 ++ pom.xml | 2 +- 18 files changed, 454 insertions(+), 112 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index b4bed59..1070d76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -71,6 +71,7 @@ public final class ExecConstants { public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete"; public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size"; public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold"; + public static final String ENABLE_UNNEST_LATERAL_KEY = "planner.enable_unnest_lateral"; // Spill boot-time Options common to all spilling operators // (Each individual operator may override the common options) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index ed5d91c..57a0ade 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; @@ -37,11 +36,9 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; -import java.util.Iterator; import java.util.List; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; @@ -351,23 +348,14 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO recordCount = 0; final List<TransferPair> transfers = Lists.newArrayList(); - //TODO: fixthis once planner changes are done final FieldReference fieldReference = - new FieldReference(SchemaPath.getSimplePath(popConfig.getColumn().toString() + "_flat")); + new FieldReference(popConfig.getColumn()); final TransferPair transferPair = getUnnestFieldTransferPair(fieldReference); final ValueVector unnestVector = transferPair.getTo(); transfers.add(transferPair); - if (unnestVector instanceof MapVector) { - Iterator<ValueVector> it = unnestVector.iterator(); - while (it.hasNext()) { - container.add(it.next()); - } - } - else { - container.add(unnestVector); - } + container.add(unnestVector); logger.debug("Added transfer for unnest expression."); container.buildSchema(SelectionVectorMode.NONE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java index 9d308f0..9938db1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java @@ -17,21 +17,29 @@ */ package org.apache.drill.exec.planner.physical; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Correlate; import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.SemiJoinType; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.planner.common.DrillCorrelateRelBase; +import org.apache.drill.exec.planner.common.DrillJoinRelBase; import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; import org.apache.drill.exec.record.BatchSchema; import java.io.IOException; import java.util.Iterator; +import java.util.List; public class CorrelatePrel extends DrillCorrelateRelBase implements Prel { @@ -60,10 +68,45 @@ public class CorrelatePrel extends DrillCorrelateRelBase implements Prel { return creator.addMetadata(this, ljoin); } + /** + * Check to make sure that the fields of the inputs are the same as the output field names. + * If not, insert a project renaming them. + */ + public RelNode getCorrelateInput(int offset, RelNode input) { + Preconditions.checkArgument(DrillJoinRelBase.uniqueFieldNames(input.getRowType())); + final List<String> fields = getRowType().getFieldNames(); + final List<String> inputFields = input.getRowType().getFieldNames(); + final List<String> outputFields = fields.subList(offset, offset + inputFields.size()); + if (!outputFields.equals(inputFields)) { + // Ensure that input field names are the same as output field names. + // If there are duplicate field names on left and right, fields will get + // lost. + // In such case, we need insert a rename Project on top of the input. + return rename(input, input.getRowType().getFieldList(), outputFields); + } else { + return input; + } + } + + private RelNode rename(RelNode input, List<RelDataTypeField> inputFields, List<String> outputFieldNames) { + List<RexNode> exprs = Lists.newArrayList(); + + for (RelDataTypeField field : inputFields) { + RexNode expr = input.getCluster().getRexBuilder().makeInputRef(field.getType(), field.getIndex()); + exprs.add(expr); + } + + RelDataType rowType = RexUtil.createStructType(input.getCluster().getTypeFactory(), + exprs, outputFieldNames, null); + + ProjectPrel proj = new ProjectPrel(input.getCluster(), input.getTraitSet(), input, exprs, rowType); + + return proj; + } @Override public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E { - return visitor.visitPrel(this, value); + return visitor.visitCorrelate(this, value); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java index 1e87305..cd598eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java @@ -51,7 +51,7 @@ public class UnnestPrel extends DrillUnnestRelBase implements Prel { @Override public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E { - return visitor.visitPrel(this, value); + return visitor.visitUnnest(this, value); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java index 02c4709..04b7c18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java @@ -24,6 +24,8 @@ import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ScreenPrel; import org.apache.drill.exec.planner.physical.WriterPrel; +import org.apache.drill.exec.planner.physical.UnnestPrel; +import org.apache.drill.exec.planner.physical.CorrelatePrel; public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> implements PrelVisitor<RETURN, EXTRA, EXCEP> { @@ -63,4 +65,14 @@ public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> implements throw new UnsupportedOperationException(String.format("No visit method defined for prel %s in visitor %s.", prel, this.getClass().getName())); } + @Override + public RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP { + return visitPrel(prel, value); + } + + @Override + public RETURN visitCorrelate(CorrelatePrel prel, EXTRA value) throws EXCEP { + return visitPrel(prel, value); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java index cddac41..dfb4036 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor; import java.util.List; import org.apache.drill.exec.planner.physical.JoinPrel; +import org.apache.drill.exec.planner.physical.CorrelatePrel; import org.apache.drill.exec.planner.physical.Prel; import org.apache.calcite.rel.RelNode; @@ -35,25 +36,26 @@ public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeEx @Override public Prel visitPrel(Prel prel, Void value) throws RuntimeException { + return preparePrel(prel, getChildren(prel)); + } + + private List<RelNode> getChildren(Prel prel) { List<RelNode> children = Lists.newArrayList(); for(Prel child : prel){ child = child.accept(this, null); children.add(child); } + return children; + } - return (Prel) prel.copy(prel.getTraitSet(), children); - + private Prel preparePrel(Prel prel, List<RelNode> renamedNodes) { + return (Prel) prel.copy(prel.getTraitSet(), renamedNodes); } @Override public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException { - List<RelNode> children = Lists.newArrayList(); - - for(Prel child : prel){ - child = child.accept(this, null); - children.add(child); - } + List<RelNode> children = getChildren(prel); final int leftCount = children.get(0).getRowType().getFieldCount(); @@ -65,7 +67,25 @@ public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeEx reNamedChildren.add(left); reNamedChildren.add(right); - return (Prel) prel.copy(prel.getTraitSet(), reNamedChildren); + return preparePrel(prel, reNamedChildren); } + //TODO: consolidate this code with join column renaming. + @Override + public Prel visitCorrelate(CorrelatePrel prel, Void value) throws RuntimeException { + + List<RelNode> children = getChildren(prel); + + final int leftCount = children.get(0).getRowType().getFieldCount(); + + List<RelNode> reNamedChildren = Lists.newArrayList(); + + RelNode left = prel.getCorrelateInput(0, children.get(0)); + RelNode right = prel.getCorrelateInput(leftCount, children.get(1)); + + reNamedChildren.add(left); + reNamedChildren.add(right); + + return preparePrel(prel, reNamedChildren); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java index bd81e98..0e7bbf6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java @@ -24,6 +24,8 @@ import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ScreenPrel; import org.apache.drill.exec.planner.physical.WriterPrel; +import org.apache.drill.exec.planner.physical.UnnestPrel; +import org.apache.drill.exec.planner.physical.CorrelatePrel; public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> { @@ -36,5 +38,7 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP; public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP; public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP; + public RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP; + public RETURN visitCorrelate(CorrelatePrel prel, EXTRA value) throws EXCEP; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java index 0ee685f..253325b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java @@ -24,6 +24,8 @@ import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ScreenPrel; import org.apache.drill.exec.planner.physical.WriterPrel; +import org.apache.drill.exec.planner.physical.UnnestPrel; +import org.apache.drill.exec.planner.physical.CorrelatePrel; /** * Debug-time class that prints a PRel tree to the console for @@ -69,7 +71,7 @@ public class PrelVisualizerVisitor } private void indent() { - for (int i = 0; i < level; i++) { + for (int i = 0; i < level; i++) { out.append(INDENT); } } @@ -225,4 +227,15 @@ public class PrelVisualizerVisitor return null; } + @Override + public Void visitUnnest(UnnestPrel prel, VisualizationState value) throws Exception { + visitPrel(prel, value); + return null; + } + + @Override + public Void visitCorrelate(CorrelatePrel prel, VisualizationState value) throws Exception { + visitPrel(prel, value); + return null; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java index 6e860cc..ac491e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ScreenPrel; import org.apache.drill.exec.planner.physical.WriterPrel; +import org.apache.drill.exec.planner.physical.UnnestPrel; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; @@ -189,14 +190,13 @@ public class StarColumnConverter extends BasePrelVisitor<Prel, Void, RuntimeExce return (Prel) prel.copy(prel.getTraitSet(), children); } - @Override - public Prel visitScan(ScanPrel scanPrel, Void value) throws RuntimeException { - if (StarColumnHelper.containsStarColumn(scanPrel.getRowType()) && prefixedForStar) { + private Prel prefixTabNameToStar(Prel prel, Void value) throws RuntimeException { + if (StarColumnHelper.containsStarColumn(prel.getRowType()) && prefixedForStar) { List<RexNode> exprs = Lists.newArrayList(); - for (RelDataTypeField field : scanPrel.getRowType().getFieldList()) { - RexNode expr = scanPrel.getCluster().getRexBuilder().makeInputRef(field.getType(), field.getIndex()); + for (RelDataTypeField field : prel.getRowType().getFieldList()) { + RexNode expr = prel.getCluster().getRexBuilder().makeInputRef(field.getType(), field.getIndex()); exprs.add(expr); } @@ -204,25 +204,35 @@ public class StarColumnConverter extends BasePrelVisitor<Prel, Void, RuntimeExce long tableId = tableNumber.getAndIncrement(); - for (String name : scanPrel.getRowType().getFieldNames()) { + for (String name : prel.getRowType().getFieldNames()) { if (StarColumnHelper.isNonPrefixedStarColumn(name)) { fieldNames.add("T" + tableId + StarColumnHelper.PREFIX_DELIMITER + name); // Add prefix to * column. } else { fieldNames.add(name); // Keep regular column as it is. } } - RelDataType rowType = RexUtil.createStructType(scanPrel.getCluster().getTypeFactory(), + RelDataType rowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), exprs, fieldNames, null); // insert a PAS. - ProjectPrel proj = new ProjectPrel(scanPrel.getCluster(), scanPrel.getTraitSet(), scanPrel, exprs, rowType); + ProjectPrel proj = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, exprs, rowType); return proj; } else { - return visitPrel(scanPrel, value); + return visitPrel(prel, value); } } + @Override + public Prel visitScan(ScanPrel scanPrel, Void value) throws RuntimeException { + return prefixTabNameToStar(scanPrel, value); + } + + @Override + public Prel visitUnnest(UnnestPrel unnestPrel, Void value) throws RuntimeException { + return prefixTabNameToStar(unnestPrel, value); + } + private List<String> makeUniqueNames(List<String> names) { // We have to search the set of original names, plus the set of unique names that will be used finally . diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java index c7e1b25..b8659d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java @@ -52,6 +52,7 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; @@ -81,6 +82,7 @@ import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.rpc.user.UserSession; +import static org.apache.calcite.util.Static.RESOURCE; import com.google.common.base.Joiner; import org.apache.drill.exec.store.ColumnExplorer; @@ -261,6 +263,11 @@ public class SqlConverter { } changeNamesIfTableIsTemporary(tempNode); } + else if (((SqlCall) node).operand(0).getKind() == SqlKind.UNNEST) { + if (((SqlCall) node).operandCount() < 3) { + throw RESOURCE.validationError("Alias table and column name are required for UNNEST").ex(); + } + } break; default: break; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java index b4c9fdd..3a17ef5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.lateraljoin; import org.apache.drill.test.BaseTestQuery; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import java.nio.file.Paths; @@ -27,7 +26,6 @@ import java.nio.file.Paths; import static junit.framework.TestCase.fail; public class TestE2EUnnestAndLateral extends BaseTestQuery { - //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestE2EUnnestAndLateral.class); private static final String regularTestFile_1 = "cust_order_10_1.json"; private static final String regularTestFile_2 = "cust_order_10_2.json"; @@ -51,7 +49,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testLateral_WithLimitInSubQuery() throws Exception { String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " + "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " + - "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O LIMIT 1) orders"; + "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) LIMIT 1) orders"; test(Sql); } @@ -59,7 +57,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testLateral_WithFilterInSubQuery() throws Exception { String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " + "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " + - "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O WHERE O.o_amount > 10) orders"; + "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10) orders"; test(Sql); } @@ -67,7 +65,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testLateral_WithFilterAndLimitInSubQuery() throws Exception { String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " + "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " + - "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O WHERE O.o_amount > 10 LIMIT 1) orders"; + "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders"; test(Sql); } @@ -75,7 +73,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testOuterApply_WithFilterAndLimitInSubQuery() throws Exception { String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " + "FROM cp.`lateraljoin/nested-customer.parquet` customer OUTER APPLY " + - "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O WHERE O.o_amount > 10 LIMIT 1) orders"; + "(SELECT t.ord.o_id as o_id , t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders"; test(Sql); } @@ -83,18 +81,24 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testLeftLateral_WithFilterAndLimitInSubQuery() throws Exception { String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " + "FROM cp.`lateraljoin/nested-customer.parquet` customer LEFT JOIN LATERAL " + - "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O WHERE O.o_amount > 10 LIMIT 1) orders ON TRUE"; + "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders ON TRUE"; test(Sql); } - @Ignore("Nested repeated type columns doesn't work here") @Test - public void testNestedUnnest() throws Exception { + public void testMultiUnnestAtSameLevel() throws Exception { String Sql = "EXPLAIN PLAN FOR SELECT customer.c_name, customer.c_address, U1.order_id, U1.order_amt," + " U1.itemName, U1.itemNum" + " FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL" + - " (SELECT O.o_id AS order_id, O.o_amount AS order_amt, U2.item_name AS itemName, U2.item_num AS itemNum" + - " FROM UNNEST(customer.orders) O, LATERAL" + - " (SELECT I.i_name AS item_name, I.i_number AS item_num FROM UNNEST(O.items) AS I) AS U2) AS U1"; + " (SELECT t.ord.o_id AS order_id, t.ord.o_amount AS order_amt, U2.item_name AS itemName, U2.item_num AS " + + "itemNum FROM UNNEST(customer.orders) t(ord) , LATERAL" + + " (SELECT t1.ord.i_name AS item_name, t1.ord.i_number AS item_num FROM UNNEST(t.ord) AS t1(ord)) AS U2) AS U1"; + test(Sql); + } + + @Test + public void testNestedUnnest() throws Exception { + String Sql = "select * from (select customer.orders as orders from cp.`lateraljoin/nested-customer.parquet` customer ) t1," + + " lateral ( select t.ord.items as items from unnest(t1.orders) t(ord) ) t2, unnest(t2.items) t3(item) "; test(Sql); } @@ -106,7 +110,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testMultipleBatchesLateralQuery() throws Exception { String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O) orders"; + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders"; test(sql); } @@ -114,7 +118,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testMultipleBatchesLateral_WithLimitInSubQuery() throws Exception { String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O LIMIT 10) orders"; + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) LIMIT 10) orders"; test(sql); } @@ -122,7 +126,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testMultipleBatchesLateral_WithLimitFilterInSubQuery() throws Exception { String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O WHERE O.o_totalprice > 100000 LIMIT 2) " + + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " + "orders"; test(sql); } @@ -139,7 +143,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST (customer.c_orders) O) orders"; + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST (customer.c_orders) t(ord)) orders"; test(sql); } catch (Exception ex) { fail(); @@ -155,7 +159,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O) orders"; + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders"; test(sql); } catch (Exception ex) { fail(); @@ -171,7 +175,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { String sql = "SELECT customer.c_name, customer.c_address, customer.c_nationkey, orders.o_orderkey, " + "orders.o_totalprice FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice, O.o_shippriority FROM UNNEST(customer.c_orders) O) orders"; + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice, t.ord.o_shippriority o_shippriority FROM UNNEST(customer.c_orders) t(ord)) orders"; test(sql); } catch (Exception ex) { @@ -189,7 +193,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testMultipleBatchesLateral_WithLimitInParent() throws Exception { String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O WHERE O.o_totalprice > 100000 LIMIT 2) " + + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " + "orders LIMIT 1"; test(sql); } @@ -198,7 +202,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testMultipleBatchesLateral_WithFilterInParent() throws Exception { String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O WHERE O.o_totalprice > 100000 LIMIT 2) " + + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " + "orders WHERE orders.o_totalprice > 240000"; test(sql); } @@ -207,7 +211,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testMultipleBatchesLateral_WithGroupByInParent() throws Exception { String sql = "SELECT customer.c_name, avg(orders.o_totalprice) AS avgPrice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_totalprice FROM UNNEST(customer.c_orders) O WHERE O.o_totalprice > 100000 LIMIT 2) " + + "(SELECT t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " + "orders GROUP BY customer.c_name"; test(sql); } @@ -216,7 +220,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery { public void testMultipleBatchesLateral_WithOrderByInParent() throws Exception { String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + - "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O) orders " + + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders " + "ORDER BY orders.o_orderkey"; test(sql); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java index 2125bd1..9e19729 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java @@ -20,10 +20,15 @@ package org.apache.drill.exec.physical.impl.lateraljoin; import static org.junit.Assert.assertEquals; import org.apache.drill.PlanTestBase; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.test.BaseTestQuery; +import org.apache.drill.test.ClientFixture; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import org.junit.Ignore; public class TestLateralPlans extends BaseTestQuery { @@ -40,8 +45,8 @@ public class TestLateralPlans extends BaseTestQuery { @Test public void testLateralSql() throws Exception { - String Sql = "select t.c_name, t2.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," - + " unnest(t.orders) t2 limit 1"; + String Sql = "select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," + + " unnest(t.orders) t2(ord) limit 1"; testBuilder() .unOrdered() .sqlQuery(Sql) @@ -52,21 +57,20 @@ public class TestLateralPlans extends BaseTestQuery { @Test public void testExplainLateralSql() throws Exception { - String Sql = "explain plan without implementation for" - + " select t.c_name, t2.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," - + " unnest(t.orders) t2 limit 1"; + String Sql = "explain plan without implementation for select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," + + " unnest(t.orders) t2(ord) limit 1"; test(Sql); } @Test public void testFilterPushCorrelate() throws Exception { test("alter session set `planner.slice_target`=1"); - String query = "select t.c_name, t2.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," - + " unnest(t.orders) t2 where t.c_name='customer1' AND t2.o_shop='Meno Park 1st' "; - PlanTestBase.testPlanMatchingPatterns(query, - new String[] {"Correlate(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"}, - new String[]{} - ); + String query = "select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," + + " unnest(t.orders) t2(ord) where t.c_name='customer1' AND t2.ord.o_shop='Meno Park 1st' "; + + PlanTestBase.testPlanMatchingPatterns(query, new String[]{"Correlate(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"}, + new String[]{}); + testBuilder() .unOrdered() .sqlQuery(query) @@ -76,41 +80,199 @@ public class TestLateralPlans extends BaseTestQuery { } @Test - @Ignore("naming of single column") public void testLateralSqlPlainCol() throws Exception { - String Sql = "select t.c_name, t2.c_phone from cp.`lateraljoin/nested-customer.json` t, unnest(t.c_phone) t2 limit 1"; + String Sql = "select t.c_name, t2.phone as c_phone from cp.`lateraljoin/nested-customer.json` t," + + " unnest(t.c_phone) t2(phone) limit 1"; testBuilder() .unOrdered() .sqlQuery(Sql) - .baselineColumns("c_name", "c_phone_flat") + .baselineColumns("c_name", "c_phone") .baselineValues("customer1", "6505200001") .go(); - } @Test public void testLateralSqlStar() throws Exception { - String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) t2 limit 1"; - test(Sql); + String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) Orders(ord) limit 0"; + + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .baselineColumns("c_name", "c_id", "c_phone", "orders", "c_address", "ord") + .expectsEmptyResultSet() + .go(); + } + + @Test + public void testLateralSqlStar2() throws Exception { + String Sql = "select c.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0"; + + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .baselineColumns("c_name", "c_id", "c_phone", "orders", "c_address") + .expectsEmptyResultSet() + .go(); + } + + @Test + public void testLateralSqlStar3() throws Exception { + String Sql = "select Orders.*, c.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0"; + + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .baselineColumns("ord","c_name", "c_id", "c_phone", "orders", "c_address") + .expectsEmptyResultSet() + .go(); + } + + @Test + public void testLateralSqlStar4() throws Exception { + String Sql = "select Orders.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0"; + + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .baselineColumns("ord") + .expectsEmptyResultSet() + .go(); } @Test - @Ignore("To be fixed: how to specify columns names for table alias in dynamic case") public void testLateralSqlWithAS() throws Exception { - String Sql = "select t.c_name, t2.o_shop from cp.`lateraljoin/nested-customer.parquet` t," - + " unnest(t.orders) as t2(o_shop) limit 1"; + String Sql = "select t.c_name, t2.orders from cp.`lateraljoin/nested-customer.parquet` t," + + " unnest(t.orders) as t2(orders)"; + String baselineQuery = "select t.c_name, t2.orders from cp.`lateraljoin/nested-customer.parquet` t inner join" + + " (select c_name, flatten(orders) from cp" + + ".`lateraljoin/nested-customer.parquet` ) as t2(name, orders) on t.c_name = t2.name"; + testBuilder() .unOrdered() .sqlQuery(Sql) - .baselineColumns("c_name", "o_shop") - .baselineValues("customer1", "Meno Park 1st") + .sqlBaselineQuery(baselineQuery) .go(); + } + + @Test + public void testMultiUnnestLateralAtSameLevel() throws Exception { + String Sql = "select t.c_name, t2.orders, t3.orders from cp.`lateraljoin/nested-customer.parquet` t," + + " LATERAL ( select t2.orders from unnest(t.orders) as t2(orders)) as t2, LATERAL " + + "(select t3.orders from unnest(t.orders) as t3(orders)) as t3"; + String baselineQuery = "select t.c_name, t2.orders, t3.orders from cp.`lateraljoin/nested-customer.parquet` t inner join" + + " (select c_name, flatten(orders) from cp.`lateraljoin/nested-customer.parquet` ) as t2 (name, orders) on t.c_name = t2.name " + + " inner join (select c_name, flatten(orders) from cp.`lateraljoin/nested-customer.parquet` ) as t3(name, orders) on t.c_name = t3.name"; + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .sqlBaselineQuery(baselineQuery) + .go(); } @Test public void testSubQuerySql() throws Exception { - String Sql = "select t2.os.* from (select t.orders as os from cp.`lateraljoin/nested-customer.parquet` t) t2"; - test(Sql); + String Sql = "select t.c_name, d1.items as items0 , t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t," + + " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) d1," + + " unnest(d1.items) t3(items)"; + + String baselineQuery = "select t.c_name, t3.orders.items as items0, t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t " + + " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " + + "t3(name, orders, items) on t.c_name = t3.name "; + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .sqlBaselineQuery(baselineQuery) + .go(); + } + + @Test + public void testUnnestWithFilter() throws Exception { + String Sql = "select t.c_name, d1.items as items0, t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t," + + " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) d1," + + " unnest(d1.items) t3(items) where t.c_id > 1"; + + String baselineQuery = "select t.c_name, t3.orders.items as items0, t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t " + + " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " + + "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1"; + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .sqlBaselineQuery(baselineQuery) + .go(); + } + + @Test + @Ignore () + public void testUnnestWithAggInSubquery() throws Exception { + String Sql = "select t.c_name, t3.items from cp.`lateraljoin/nested-customer.parquet` t," + + " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) d1," + + " lateral (select avg(t3.items.i_number) from unnest(d1.items) t3(items)) where t.c_id > 1"; + + String baselineQuery = "select t.c_name, avg(t3.items.i_number) from cp.`lateraljoin/nested-customer.parquet` t " + + " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " + + "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1 group by t.c_name"; + + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + client + .queryBuilder() + .sql(Sql) + .run(); + } + } + + @Test + public void testUnnestWithAggOnOuterTable() throws Exception { + String Sql = "select avg(d2.inum) from cp.`lateraljoin/nested-customer.parquet` t," + + " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) d1," + + " lateral (select t3.items.i_number as inum from unnest(d1.items) t3(items)) d2 where t.c_id > 1 group by t.c_id"; + + String baselineQuery = "select avg(t3.items.i_number) from cp.`lateraljoin/nested-customer.parquet` t " + + " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " + + "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1 group by t.c_id"; + + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .sqlBaselineQuery(baselineQuery) + .go(); + } + + @Test + public void testUnnestTableAndColumnAlias() throws Exception { + String Sql = "select t.c_name from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) "; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + client + .queryBuilder() + .sql(Sql) + .run(); + } catch (UserRemoteException ex) { + assert(ex.getMessage().contains("Alias table and column name are required for UNNEST")); + } + } + + @Test + public void testUnnestColumnAlias() throws Exception { + String Sql = "select t.c_name, t2.orders from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) t2"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + client + .queryBuilder() + .sql(Sql) + .run(); + } catch (UserRemoteException ex) { + assert(ex.getMessage().contains("Alias table and column name are required for UNNEST")); + } } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java index 8ec0c96..137966b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java @@ -42,7 +42,6 @@ import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.schema.SchemaBuilder; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -133,7 +132,6 @@ import static org.junit.Assert.assertTrue; } @Test - @Ignore("With DRILL-6321 commits, Unnest's output could be multiplec olumns") public void testUnnestMapColumn() { Object[][] data = getMapData(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java index 70a32f8..9318c51 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java @@ -28,10 +28,13 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.LateralContract; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.config.UnnestPOP; import org.apache.drill.exec.physical.impl.MockRecordBatch; import org.apache.drill.exec.physical.impl.join.LateralJoinBatch; +import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.planner.logical.DrillLogicalTestutils; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.VectorContainer; @@ -466,7 +469,6 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { } - @Test public void testUnnestNonArrayColumn() { @@ -555,8 +557,15 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest { final UnnestRecordBatch unnestBatch = new UnnestRecordBatch(unnestPopConfig, fixture.getFragmentContext()); + // project is required to rename the columns so as to disambiguate the same column name from + // unnest operator and the regular scan. + final Project projectPopConfig = new Project(DrillLogicalTestutils.parseExprs("unnestColumn", "unnestColumn1"), null); + + final ProjectRecordBatch projectBatch = + new ProjectRecordBatch( projectPopConfig, unnestBatch, fixture.getFragmentContext()); + final LateralJoinBatch lateralJoinBatch = - new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), incomingMockBatch, unnestBatch); + new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), incomingMockBatch, projectBatch); // set pointer to Lateral in unnest unnestBatch.setIncoming((LateralContract) lateralJoinBatch); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java index 66354ff..b8a219b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java @@ -21,9 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.antlr.runtime.ANTLRStringStream; -import org.antlr.runtime.CommonTokenStream; -import org.antlr.runtime.RecognitionException; import org.apache.calcite.rel.RelFieldCollation; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.memory.BufferAllocator; @@ -34,6 +31,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentStats; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.logical.DrillLogicalTestutils; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.control.WorkEventBus; @@ -46,12 +44,8 @@ import org.apache.drill.test.BaseDirTestWatcher; import org.apache.drill.test.DrillTestWrapper; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; -import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.expression.parser.ExprLexer; -import org.apache.drill.common.expression.parser.ExprParser; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.logical.data.Order; @@ -131,33 +125,19 @@ public class PhysicalOpUnitTestBase extends ExecTest { @Override protected LogicalExpression parseExpr(String expr) { - ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr)); - CommonTokenStream tokens = new CommonTokenStream(lexer); - ExprParser parser = new ExprParser(tokens); - try { - return parser.parse().e; - } catch (RecognitionException e) { - throw new RuntimeException("Error parsing expression: " + expr); - } + return DrillLogicalTestutils.parseExpr(expr); } protected Order.Ordering ordering(String expression, RelFieldCollation.Direction direction, RelFieldCollation.NullDirection nullDirection) { - return new Order.Ordering(direction, parseExpr(expression), nullDirection); + return DrillLogicalTestutils.ordering(expression, direction, nullDirection); } protected JoinCondition joinCond(String leftExpr, String relationship, String rightExpr) { - return new JoinCondition(relationship, parseExpr(leftExpr), parseExpr(rightExpr)); + return DrillLogicalTestutils.joinCond(leftExpr, relationship, rightExpr); } protected List<NamedExpression> parseExprs(String... expressionsAndOutputNames) { - Preconditions.checkArgument(expressionsAndOutputNames.length %2 ==0, "List of expressions and output field names" + - " is not complete, each expression must explicitly give and output name,"); - List<NamedExpression> ret = new ArrayList<>(); - for (int i = 0; i < expressionsAndOutputNames.length; i += 2) { - ret.add(new NamedExpression(parseExpr(expressionsAndOutputNames[i]), - new FieldReference(new SchemaPath(new PathSegment.NameSegment(expressionsAndOutputNames[i+1]))))); - } - return ret; + return DrillLogicalTestutils.parseExprs(expressionsAndOutputNames); } protected static class BatchIterator implements Iterable<VectorAccessible> { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillLogicalTestutils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillLogicalTestutils.java new file mode 100644 index 0000000..a8927bc --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillLogicalTestutils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import com.google.common.base.Preconditions; +import org.antlr.runtime.ANTLRStringStream; +import org.antlr.runtime.CommonTokenStream; +import org.antlr.runtime.RecognitionException; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.parser.ExprLexer; +import org.apache.drill.common.expression.parser.ExprParser; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.common.logical.data.Order; + +import java.util.ArrayList; +import java.util.List; + +public class DrillLogicalTestutils { + public static Order.Ordering ordering(String expression, + RelFieldCollation.Direction direction, + RelFieldCollation.NullDirection nullDirection) { + return new Order.Ordering(direction, parseExpr(expression), nullDirection); + } + + public static JoinCondition joinCond(String leftExpr, String relationship, String rightExpr) { + return new JoinCondition(relationship, parseExpr(leftExpr), parseExpr(rightExpr)); + } + + public static List<NamedExpression> parseExprs(String... expressionsAndOutputNames) { + Preconditions.checkArgument(expressionsAndOutputNames.length % 2 == 0, + "List of expressions and output field names" + " is not complete, each expression must explicitly give and output name,"); + List<NamedExpression> ret = new ArrayList<>(); + for (int i = 0; i < expressionsAndOutputNames.length; i += 2) { + ret.add(new NamedExpression(parseExpr(expressionsAndOutputNames[i]), + new FieldReference(new SchemaPath(new PathSegment.NameSegment(expressionsAndOutputNames[i + 1]))))); + } + return ret; + } + + public static LogicalExpression parseExpr(String expr) { + ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr)); + CommonTokenStream tokens = new CommonTokenStream(lexer); + ExprParser parser = new ExprParser(tokens); + try { + return parser.parse().e; + } catch (RecognitionException e) { + throw new RuntimeException("Error parsing expression: " + expr); + } + } +} \ No newline at end of file diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java index 57bc79c..0dfc1f7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java @@ -515,6 +515,15 @@ public class DrillTestWrapper { addTypeInfoIfMissing(actual.get(0), testBuilder); addToMaterializedResults(actualRecords, actual, loader); + // If actual result record number is 0, + // and the baseline records does not exist, and baselineColumns provided, + // compare actual column number/names with expected columns + if (actualRecords.size() == 0 + && (baselineRecords == null || baselineRecords.size()==0) + && baselineColumns != null) { + checkColumnDef(loader.getSchema()); + } + // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes // the cases where the baseline is stored in a file. if (baselineRecords == null) { @@ -531,6 +540,18 @@ public class DrillTestWrapper { } } + public void checkColumnDef(BatchSchema batchSchema) throws Exception{ + assert (batchSchema != null && batchSchema.getFieldCount()==baselineColumns.length); + for (int i=0; i<baselineColumns.length; ++i) { + if (!SchemaPath.parseFromString(baselineColumns[i]).equals( + SchemaPath.parseFromString(batchSchema.getColumn(i).getName()))) { + throw new Exception(i + "the expected column name is not matching: " + + baselineColumns[i] + " is not " + + batchSchema.getColumn(i).getName()); + } + } + } + /** * Use this method only if necessary to validate one query against another. If you are just validating against a * baseline file use one of the simpler interfaces that will write the validation query for you. diff --git a/pom.xml b/pom.xml index 620f73c..206af7e 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ <dep.guava.version>18.0</dep.guava.version> <forkCount>2</forkCount> <parquet.version>1.8.1-drill-r0</parquet.version> - <calcite.version>1.16.0-drill-r1</calcite.version> + <calcite.version>1.16.0-drill-r3</calcite.version> <avatica.version>1.11.0</avatica.version> <janino.version>2.7.6</janino.version> <sqlline.version>1.1.9-drill-r7</sqlline.version> -- To stop receiving notification emails like this one, please contact par...@apache.org.