This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 3af718f050f28210d4c5dcf359c617b2300dd736
Author: hmaduri <hmad...@maprtech.com>
AuthorDate: Tue May 15 13:51:45 2018 -0700

    DRILL-6431: Unnest operator requires table and a single column alias to be 
specified.
    Fixing the issues related to star column renaming, same field name renaming
    and also enforcing that an alias column is required for the unnest operator.
---
 .../java/org/apache/drill/exec/ExecConstants.java  |   1 +
 .../physical/impl/unnest/UnnestRecordBatch.java    |  16 +-
 .../drill/exec/planner/physical/CorrelatePrel.java |  45 ++++-
 .../drill/exec/planner/physical/UnnestPrel.java    |   2 +-
 .../planner/physical/visitor/BasePrelVisitor.java  |  12 ++
 .../physical/visitor/JoinPrelRenameVisitor.java    |  38 +++-
 .../exec/planner/physical/visitor/PrelVisitor.java |   4 +
 .../physical/visitor/PrelVisualizerVisitor.java    |  15 +-
 .../physical/visitor/StarColumnConverter.java      |  28 ++-
 .../drill/exec/planner/sql/SqlConverter.java       |   7 +
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  48 ++---
 .../impl/lateraljoin/TestLateralPlans.java         | 212 ++++++++++++++++++---
 .../impl/unnest/TestUnnestCorrectness.java         |   2 -
 .../unnest/TestUnnestWithLateralCorrectness.java   |  13 +-
 .../exec/physical/unit/PhysicalOpUnitTestBase.java |  30 +--
 .../planner/logical/DrillLogicalTestutils.java     |  70 +++++++
 .../org/apache/drill/test/DrillTestWrapper.java    |  21 ++
 pom.xml                                            |   2 +-
 18 files changed, 454 insertions(+), 112 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index b4bed59..1070d76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -71,6 +71,7 @@ public final class ExecConstants {
   public static final String SPOOLING_BUFFER_DELETE = 
"drill.exec.buffer.spooling.delete";
   public static final String SPOOLING_BUFFER_MEMORY = 
"drill.exec.buffer.spooling.size";
   public static final String BATCH_PURGE_THRESHOLD = 
"drill.exec.sort.purge.threshold";
+  public static final String ENABLE_UNNEST_LATERAL_KEY = 
"planner.enable_unnest_lateral";
 
   // Spill boot-time Options common to all spilling operators
   // (Each individual operator may override the common options)
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index ed5d91c..57a0ade 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -37,11 +36,9 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
-import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -351,23 +348,14 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
     recordCount = 0;
     final List<TransferPair> transfers = Lists.newArrayList();
 
-    //TODO: fixthis once planner changes are done
     final FieldReference fieldReference =
-        new 
FieldReference(SchemaPath.getSimplePath(popConfig.getColumn().toString() + 
"_flat"));
+        new FieldReference(popConfig.getColumn());
 
     final TransferPair transferPair = 
getUnnestFieldTransferPair(fieldReference);
 
     final ValueVector unnestVector = transferPair.getTo();
     transfers.add(transferPair);
-    if (unnestVector instanceof MapVector) {
-      Iterator<ValueVector> it = unnestVector.iterator();
-      while (it.hasNext()) {
-        container.add(it.next());
-      }
-    }
-    else {
-      container.add(unnestVector);
-    }
+    container.add(unnestVector);
     logger.debug("Added transfer for unnest expression.");
     container.buildSchema(SelectionVectorMode.NONE);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
index 9d308f0..9938db1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
@@ -17,21 +17,29 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.planner.common.DrillCorrelateRelBase;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
 public class CorrelatePrel extends DrillCorrelateRelBase implements Prel {
 
@@ -60,10 +68,45 @@ public class CorrelatePrel extends DrillCorrelateRelBase 
implements Prel {
     return creator.addMetadata(this, ljoin);
   }
 
+  /**
+   * Check to make sure that the fields of the inputs are the same as the 
output field names.
+   * If not, insert a project renaming them.
+   */
+  public RelNode getCorrelateInput(int offset, RelNode input) {
+    
Preconditions.checkArgument(DrillJoinRelBase.uniqueFieldNames(input.getRowType()));
+    final List<String> fields = getRowType().getFieldNames();
+    final List<String> inputFields = input.getRowType().getFieldNames();
+    final List<String> outputFields = fields.subList(offset, offset + 
inputFields.size());
+    if (!outputFields.equals(inputFields)) {
+      // Ensure that input field names are the same as output field names.
+      // If there are duplicate field names on left and right, fields will get
+      // lost.
+      // In such case, we need insert a rename Project on top of the input.
+      return rename(input, input.getRowType().getFieldList(), outputFields);
+    } else {
+      return input;
+    }
+  }
+
+  private RelNode rename(RelNode input, List<RelDataTypeField> inputFields, 
List<String> outputFieldNames) {
+    List<RexNode> exprs = Lists.newArrayList();
+
+    for (RelDataTypeField field : inputFields) {
+      RexNode expr = 
input.getCluster().getRexBuilder().makeInputRef(field.getType(), 
field.getIndex());
+      exprs.add(expr);
+    }
+
+    RelDataType rowType = 
RexUtil.createStructType(input.getCluster().getTypeFactory(),
+        exprs, outputFieldNames, null);
+
+    ProjectPrel proj = new ProjectPrel(input.getCluster(), 
input.getTraitSet(), input, exprs, rowType);
+
+    return proj;
+  }
 
   @Override
   public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X 
value) throws E {
-    return visitor.visitPrel(this, value);
+    return visitor.visitCorrelate(this, value);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
index 1e87305..cd598eb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
@@ -51,7 +51,7 @@ public class UnnestPrel extends DrillUnnestRelBase implements 
Prel {
 
   @Override
   public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X 
value) throws E {
-    return visitor.visitPrel(this, value);
+    return visitor.visitUnnest(this, value);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
index 02c4709..04b7c18 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
@@ -24,6 +24,8 @@ import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
+import org.apache.drill.exec.planner.physical.CorrelatePrel;
 
 public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> 
implements PrelVisitor<RETURN, EXTRA, EXCEP> {
 
@@ -63,4 +65,14 @@ public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> implements
     throw new UnsupportedOperationException(String.format("No visit method 
defined for prel %s in visitor %s.", prel, this.getClass().getName()));
   }
 
+  @Override
+  public RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP {
+    return visitPrel(prel, value);
+  }
+
+  @Override
+  public RETURN visitCorrelate(CorrelatePrel prel, EXTRA value) throws EXCEP {
+    return visitPrel(prel, value);
+  }
+
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
index cddac41..dfb4036 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor;
 import java.util.List;
 
 import org.apache.drill.exec.planner.physical.JoinPrel;
+import org.apache.drill.exec.planner.physical.CorrelatePrel;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.calcite.rel.RelNode;
 
@@ -35,25 +36,26 @@ public class JoinPrelRenameVisitor extends 
BasePrelVisitor<Prel, Void, RuntimeEx
 
   @Override
   public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+    return preparePrel(prel, getChildren(prel));
+  }
+
+  private List<RelNode> getChildren(Prel prel) {
     List<RelNode> children = Lists.newArrayList();
     for(Prel child : prel){
       child = child.accept(this, null);
       children.add(child);
     }
+    return children;
+  }
 
-    return (Prel) prel.copy(prel.getTraitSet(), children);
-
+  private Prel preparePrel(Prel prel, List<RelNode> renamedNodes) {
+    return (Prel) prel.copy(prel.getTraitSet(), renamedNodes);
   }
 
   @Override
   public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException {
 
-    List<RelNode> children = Lists.newArrayList();
-
-    for(Prel child : prel){
-      child = child.accept(this, null);
-      children.add(child);
-    }
+    List<RelNode> children = getChildren(prel);
 
     final int leftCount = children.get(0).getRowType().getFieldCount();
 
@@ -65,7 +67,25 @@ public class JoinPrelRenameVisitor extends 
BasePrelVisitor<Prel, Void, RuntimeEx
     reNamedChildren.add(left);
     reNamedChildren.add(right);
 
-    return (Prel) prel.copy(prel.getTraitSet(), reNamedChildren);
+    return preparePrel(prel, reNamedChildren);
   }
 
+  //TODO: consolidate this code with join column renaming.
+  @Override
+  public Prel visitCorrelate(CorrelatePrel prel, Void value) throws 
RuntimeException {
+
+    List<RelNode> children = getChildren(prel);
+
+    final int leftCount = children.get(0).getRowType().getFieldCount();
+
+    List<RelNode> reNamedChildren = Lists.newArrayList();
+
+    RelNode left = prel.getCorrelateInput(0, children.get(0));
+    RelNode right = prel.getCorrelateInput(leftCount, children.get(1));
+
+    reNamedChildren.add(left);
+    reNamedChildren.add(right);
+
+    return preparePrel(prel, reNamedChildren);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
index bd81e98..0e7bbf6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
@@ -24,6 +24,8 @@ import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
+import org.apache.drill.exec.planner.physical.CorrelatePrel;
 
 
 public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
@@ -36,5 +38,7 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP;
+  public RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP;
+  public RETURN visitCorrelate(CorrelatePrel prel, EXTRA value) throws EXCEP;
 
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
index 0ee685f..253325b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
@@ -24,6 +24,8 @@ import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
+import org.apache.drill.exec.planner.physical.CorrelatePrel;
 
 /**
  * Debug-time class that prints a PRel tree to the console for
@@ -69,7 +71,7 @@ public class PrelVisualizerVisitor
     }
 
     private void indent() {
-      for (int i = 0;  i < level;  i++) {
+      for (int i = 0; i < level; i++) {
         out.append(INDENT);
       }
     }
@@ -225,4 +227,15 @@ public class PrelVisualizerVisitor
     return null;
   }
 
+  @Override
+  public Void visitUnnest(UnnestPrel prel, VisualizationState value) throws 
Exception {
+    visitPrel(prel, value);
+    return null;
+  }
+
+  @Override
+  public Void visitCorrelate(CorrelatePrel prel, VisualizationState value) 
throws Exception {
+    visitPrel(prel, value);
+    return null;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
index 6e860cc..ac491e9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -189,14 +190,13 @@ public class StarColumnConverter extends 
BasePrelVisitor<Prel, Void, RuntimeExce
     return (Prel) prel.copy(prel.getTraitSet(), children);
   }
 
-  @Override
-  public Prel visitScan(ScanPrel scanPrel, Void value) throws RuntimeException 
{
-    if (StarColumnHelper.containsStarColumn(scanPrel.getRowType()) && 
prefixedForStar) {
+  private Prel prefixTabNameToStar(Prel prel, Void value) throws 
RuntimeException {
+    if (StarColumnHelper.containsStarColumn(prel.getRowType()) && 
prefixedForStar) {
 
       List<RexNode> exprs = Lists.newArrayList();
 
-      for (RelDataTypeField field : scanPrel.getRowType().getFieldList()) {
-        RexNode expr = 
scanPrel.getCluster().getRexBuilder().makeInputRef(field.getType(), 
field.getIndex());
+      for (RelDataTypeField field : prel.getRowType().getFieldList()) {
+        RexNode expr = 
prel.getCluster().getRexBuilder().makeInputRef(field.getType(), 
field.getIndex());
         exprs.add(expr);
       }
 
@@ -204,25 +204,35 @@ public class StarColumnConverter extends 
BasePrelVisitor<Prel, Void, RuntimeExce
 
       long tableId = tableNumber.getAndIncrement();
 
-      for (String name : scanPrel.getRowType().getFieldNames()) {
+      for (String name : prel.getRowType().getFieldNames()) {
         if (StarColumnHelper.isNonPrefixedStarColumn(name)) {
           fieldNames.add("T" +  tableId + StarColumnHelper.PREFIX_DELIMITER + 
name);  // Add prefix to * column.
         } else {
           fieldNames.add(name);  // Keep regular column as it is.
         }
       }
-      RelDataType rowType = 
RexUtil.createStructType(scanPrel.getCluster().getTypeFactory(),
+      RelDataType rowType = 
RexUtil.createStructType(prel.getCluster().getTypeFactory(),
           exprs, fieldNames, null);
 
       // insert a PAS.
-      ProjectPrel proj = new ProjectPrel(scanPrel.getCluster(), 
scanPrel.getTraitSet(), scanPrel, exprs, rowType);
+      ProjectPrel proj = new ProjectPrel(prel.getCluster(), 
prel.getTraitSet(), prel, exprs, rowType);
 
       return proj;
     } else {
-      return visitPrel(scanPrel, value);
+      return visitPrel(prel, value);
     }
   }
 
+  @Override
+  public Prel visitScan(ScanPrel scanPrel, Void value) throws RuntimeException 
{
+    return prefixTabNameToStar(scanPrel, value);
+  }
+
+  @Override
+  public Prel visitUnnest(UnnestPrel unnestPrel, Void value) throws 
RuntimeException {
+    return prefixTabNameToStar(unnestPrel, value);
+  }
+
   private List<String> makeUniqueNames(List<String> names) {
 
     // We have to search the set of original names, plus the set of unique 
names that will be used finally .
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index c7e1b25..b8659d1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -52,6 +52,7 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -81,6 +82,7 @@ import 
org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.rpc.user.UserSession;
+import static org.apache.calcite.util.Static.RESOURCE;
 
 import com.google.common.base.Joiner;
 import org.apache.drill.exec.store.ColumnExplorer;
@@ -261,6 +263,11 @@ public class SqlConverter {
             }
             changeNamesIfTableIsTemporary(tempNode);
           }
+          else  if (((SqlCall) node).operand(0).getKind() == SqlKind.UNNEST) {
+            if (((SqlCall) node).operandCount() < 3) {
+              throw RESOURCE.validationError("Alias table and column name are 
required for UNNEST").ex();
+            }
+          }
           break;
         default:
           break;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index b4c9fdd..3a17ef5 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.lateraljoin;
 
 import org.apache.drill.test.BaseTestQuery;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.nio.file.Paths;
@@ -27,7 +26,6 @@ import java.nio.file.Paths;
 import static junit.framework.TestCase.fail;
 
 public class TestE2EUnnestAndLateral extends BaseTestQuery {
-  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestE2EUnnestAndLateral.class);
 
   private static final String regularTestFile_1 = "cust_order_10_1.json";
   private static final String regularTestFile_2 = "cust_order_10_2.json";
@@ -51,7 +49,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testLateral_WithLimitInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, 
orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
-      "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O LIMIT 1) 
orders";
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM 
UNNEST(customer.orders) t(ord) LIMIT 1) orders";
     test(Sql);
   }
 
@@ -59,7 +57,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testLateral_WithFilterInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, 
orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
-      "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O WHERE 
O.o_amount > 10) orders";
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM 
UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10) orders";
     test(Sql);
   }
 
@@ -67,7 +65,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testLateral_WithFilterAndLimitInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, 
orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
-      "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O WHERE 
O.o_amount > 10 LIMIT 1) orders";
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM 
UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders";
     test(Sql);
   }
 
@@ -75,7 +73,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testOuterApply_WithFilterAndLimitInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, 
orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer OUTER APPLY " +
-      "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O WHERE 
O.o_amount > 10 LIMIT 1) orders";
+      "(SELECT t.ord.o_id as o_id , t.ord.o_amount as o_amount FROM 
UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders";
     test(Sql);
   }
 
@@ -83,18 +81,24 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testLeftLateral_WithFilterAndLimitInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, 
orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer LEFT JOIN 
LATERAL " +
-      "(SELECT O.o_id, O.o_amount FROM UNNEST(customer.orders) O WHERE 
O.o_amount > 10 LIMIT 1) orders ON TRUE";
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM 
UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders ON 
TRUE";
     test(Sql);
   }
 
-  @Ignore("Nested repeated type columns doesn't work here")
   @Test
-  public void testNestedUnnest() throws Exception {
+  public void testMultiUnnestAtSameLevel() throws Exception {
     String Sql = "EXPLAIN PLAN FOR SELECT customer.c_name, customer.c_address, 
U1.order_id, U1.order_amt," +
       " U1.itemName, U1.itemNum" + " FROM 
cp.`lateraljoin/nested-customer.parquet` customer, LATERAL" +
-      " (SELECT O.o_id AS order_id, O.o_amount AS order_amt, U2.item_name AS 
itemName, U2.item_num AS itemNum" +
-      " FROM UNNEST(customer.orders) O, LATERAL" +
-      " (SELECT I.i_name AS item_name, I.i_number AS item_num FROM 
UNNEST(O.items) AS I) AS U2) AS U1";
+      " (SELECT t.ord.o_id AS order_id, t.ord.o_amount AS order_amt, 
U2.item_name AS itemName, U2.item_num AS " +
+        "itemNum FROM UNNEST(customer.orders) t(ord) , LATERAL" +
+      " (SELECT t1.ord.i_name AS item_name, t1.ord.i_number AS item_num FROM 
UNNEST(t.ord) AS t1(ord)) AS U2) AS U1";
+    test(Sql);
+  }
+
+  @Test
+  public void testNestedUnnest() throws Exception {
+    String Sql = "select * from (select customer.orders as orders from 
cp.`lateraljoin/nested-customer.parquet` customer ) t1," +
+        " lateral ( select t.ord.items as items from unnest(t1.orders) t(ord) 
) t2, unnest(t2.items) t3(item) ";
     test(Sql);
   }
 
@@ -106,7 +110,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testMultipleBatchesLateralQuery() throws Exception {
     String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-      "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O) 
orders";
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders";
     test(sql);
   }
 
@@ -114,7 +118,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testMultipleBatchesLateral_WithLimitInSubQuery() throws 
Exception {
     String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-      "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O 
LIMIT 10) orders";
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice FROM UNNEST(customer.c_orders) t(ord) LIMIT 10) orders";
     test(sql);
   }
 
@@ -122,7 +126,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testMultipleBatchesLateral_WithLimitFilterInSubQuery() throws 
Exception {
     String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-      "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O 
WHERE O.o_totalprice > 100000 LIMIT 2) " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 
100000 LIMIT 2) " +
       "orders";
     test(sql);
   }
@@ -139,7 +143,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
 
       String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +
         "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-        "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST (customer.c_orders) 
O) orders";
+        "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice FROM UNNEST (customer.c_orders) t(ord)) orders";
       test(sql);
     } catch (Exception ex) {
       fail();
@@ -155,7 +159,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
 
       String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +
         "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-        "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) 
O) orders";
+        "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders";
       test(sql);
     } catch (Exception ex) {
       fail();
@@ -171,7 +175,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
 
       String sql = "SELECT customer.c_name, customer.c_address, 
customer.c_nationkey, orders.o_orderkey, " +
         "orders.o_totalprice FROM dfs.`lateraljoin/multipleFiles` customer, 
LATERAL " +
-        "(SELECT O.o_orderkey, O.o_totalprice, O.o_shippriority FROM 
UNNEST(customer.c_orders) O) orders";
+        "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice, t.ord.o_shippriority o_shippriority FROM 
UNNEST(customer.c_orders) t(ord)) orders";
 
       test(sql);
     } catch (Exception ex) {
@@ -189,7 +193,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testMultipleBatchesLateral_WithLimitInParent() throws Exception {
     String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-      "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O 
WHERE O.o_totalprice > 100000 LIMIT 2) " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice  as 
o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 
100000 LIMIT 2) " +
       "orders LIMIT 1";
     test(sql);
   }
@@ -198,7 +202,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testMultipleBatchesLateral_WithFilterInParent() throws Exception 
{
     String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-      "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O 
WHERE O.o_totalprice > 100000 LIMIT 2) " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 
100000 LIMIT 2) " +
       "orders WHERE orders.o_totalprice > 240000";
     test(sql);
   }
@@ -207,7 +211,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testMultipleBatchesLateral_WithGroupByInParent() throws 
Exception {
     String sql = "SELECT customer.c_name, avg(orders.o_totalprice) AS avgPrice 
" +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-      "(SELECT O.o_totalprice FROM UNNEST(customer.c_orders) O WHERE 
O.o_totalprice > 100000 LIMIT 2) " +
+      "(SELECT t.ord.o_totalprice as o_totalprice FROM 
UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " +
       "orders GROUP BY customer.c_name";
     test(sql);
   }
@@ -216,7 +220,7 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public void testMultipleBatchesLateral_WithOrderByInParent() throws 
Exception {
     String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
-      "(SELECT O.o_orderkey, O.o_totalprice FROM UNNEST(customer.c_orders) O) 
orders " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders " +
       "ORDER BY orders.o_orderkey";
     test(sql);
   }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 2125bd1..9e19729 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -20,10 +20,15 @@ package org.apache.drill.exec.physical.impl.lateraljoin;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.Ignore;
 
 public class TestLateralPlans extends BaseTestQuery {
 
@@ -40,8 +45,8 @@ public class TestLateralPlans extends BaseTestQuery {
 
   @Test
   public void testLateralSql() throws Exception {
-    String Sql = "select t.c_name, t2.o_shop as o_shop from 
cp.`lateraljoin/nested-customer.json` t,"
-                 + " unnest(t.orders) t2 limit 1";
+    String Sql = "select t.c_name, t2.ord.o_shop as o_shop from 
cp.`lateraljoin/nested-customer.json` t," +
+        " unnest(t.orders) t2(ord) limit 1";
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -52,21 +57,20 @@ public class TestLateralPlans extends BaseTestQuery {
 
   @Test
   public void testExplainLateralSql() throws Exception {
-    String Sql = "explain plan without implementation for"
-        + " select t.c_name, t2.o_shop as o_shop from 
cp.`lateraljoin/nested-customer.json` t,"
-        + " unnest(t.orders) t2 limit 1";
+    String Sql = "explain plan without implementation for select t.c_name, 
t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
+        " unnest(t.orders) t2(ord) limit 1";
     test(Sql);
   }
 
   @Test
   public void testFilterPushCorrelate() throws Exception {
     test("alter session set `planner.slice_target`=1");
-    String query = "select t.c_name, t2.o_shop as o_shop from 
cp.`lateraljoin/nested-customer.json` t,"
-        + " unnest(t.orders) t2 where t.c_name='customer1' AND t2.o_shop='Meno 
Park 1st' ";
-    PlanTestBase.testPlanMatchingPatterns(query,
-        new String[] 
{"Correlate(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"},
-        new String[]{}
-    );
+    String query = "select t.c_name, t2.ord.o_shop as o_shop from 
cp.`lateraljoin/nested-customer.json` t,"
+        + " unnest(t.orders) t2(ord) where t.c_name='customer1' AND 
t2.ord.o_shop='Meno Park 1st' ";
+
+    PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"Correlate(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"},
+        new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(query)
@@ -76,41 +80,199 @@ public class TestLateralPlans extends BaseTestQuery {
   }
 
   @Test
-  @Ignore("naming of single column")
   public void testLateralSqlPlainCol() throws Exception {
-    String Sql = "select t.c_name, t2.c_phone from 
cp.`lateraljoin/nested-customer.json` t, unnest(t.c_phone) t2 limit 1";
+    String Sql = "select t.c_name, t2.phone as c_phone from 
cp.`lateraljoin/nested-customer.json` t,"
+        + " unnest(t.c_phone) t2(phone) limit 1";
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
-        .baselineColumns("c_name", "c_phone_flat")
+        .baselineColumns("c_name", "c_phone")
         .baselineValues("customer1", "6505200001")
         .go();
-
   }
 
   @Test
   public void testLateralSqlStar() throws Exception {
-    String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, 
unnest(t.orders) t2 limit 1";
-    test(Sql);
+    String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, 
unnest(t.orders) Orders(ord) limit 0";
+
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .baselineColumns("c_name", "c_id", "c_phone", "orders", "c_address", 
"ord")
+        .expectsEmptyResultSet()
+        .go();
+  }
+
+  @Test
+  public void testLateralSqlStar2() throws Exception {
+    String Sql = "select c.* from cp.`lateraljoin/nested-customer.json` c, 
unnest(c.orders) Orders(ord) limit 0";
+
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .baselineColumns("c_name", "c_id", "c_phone", "orders", "c_address")
+        .expectsEmptyResultSet()
+        .go();
+  }
+
+  @Test
+  public void testLateralSqlStar3() throws Exception {
+    String Sql = "select Orders.*, c.* from 
cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0";
+
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .baselineColumns("ord","c_name", "c_id", "c_phone", "orders", 
"c_address")
+        .expectsEmptyResultSet()
+        .go();
+  }
+
+  @Test
+  public void testLateralSqlStar4() throws Exception {
+    String Sql = "select Orders.* from cp.`lateraljoin/nested-customer.json` 
c, unnest(c.orders) Orders(ord) limit 0";
+
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .baselineColumns("ord")
+        .expectsEmptyResultSet()
+        .go();
   }
 
   @Test
-  @Ignore("To be fixed: how to specify columns names for table alias in 
dynamic case")
   public void testLateralSqlWithAS() throws Exception {
-    String Sql = "select t.c_name, t2.o_shop from 
cp.`lateraljoin/nested-customer.parquet` t,"
-        + " unnest(t.orders) as t2(o_shop) limit 1";
+    String Sql = "select t.c_name, t2.orders from 
cp.`lateraljoin/nested-customer.parquet` t,"
+        + " unnest(t.orders) as t2(orders)";
+    String baselineQuery = "select t.c_name, t2.orders from 
cp.`lateraljoin/nested-customer.parquet` t inner join" +
+        " (select c_name, flatten(orders) from cp" +
+        ".`lateraljoin/nested-customer.parquet` ) as t2(name, orders) on 
t.c_name = t2.name";
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
-        .baselineColumns("c_name", "o_shop")
-        .baselineValues("customer1", "Meno Park 1st")
+        .sqlBaselineQuery(baselineQuery)
         .go();
+  }
+
+  @Test
+  public void testMultiUnnestLateralAtSameLevel() throws Exception {
+    String Sql = "select t.c_name, t2.orders, t3.orders from 
cp.`lateraljoin/nested-customer.parquet` t," +
+        " LATERAL ( select t2.orders from unnest(t.orders) as t2(orders)) as 
t2, LATERAL " +
+        "(select t3.orders from unnest(t.orders) as t3(orders)) as t3";
+    String baselineQuery = "select t.c_name, t2.orders, t3.orders from 
cp.`lateraljoin/nested-customer.parquet` t inner join" +
+        " (select c_name, flatten(orders) from 
cp.`lateraljoin/nested-customer.parquet` ) as t2 (name, orders) on t.c_name = 
t2.name " +
+        " inner join (select c_name, flatten(orders) from 
cp.`lateraljoin/nested-customer.parquet` ) as t3(name, orders) on t.c_name = 
t3.name";
 
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .sqlBaselineQuery(baselineQuery)
+        .go();
   }
 
   @Test
   public void testSubQuerySql() throws Exception {
-    String Sql = "select t2.os.* from (select t.orders as os from 
cp.`lateraljoin/nested-customer.parquet` t) t2";
-    test(Sql);
+    String Sql = "select t.c_name, d1.items as items0 , t3.items as items1 
from cp.`lateraljoin/nested-customer.parquet` t," +
+        " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) 
d1," +
+        " unnest(d1.items) t3(items)";
+
+    String baselineQuery = "select t.c_name, t3.orders.items as items0, 
t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t " +
+        " inner join (select c_name, f, flatten(t1.f.items) from (select 
c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as 
t1 ) " +
+        "t3(name, orders, items) on t.c_name = t3.name ";
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .sqlBaselineQuery(baselineQuery)
+        .go();
+  }
+
+  @Test
+  public void testUnnestWithFilter() throws Exception {
+    String Sql = "select t.c_name, d1.items as items0, t3.items as items1 from 
cp.`lateraljoin/nested-customer.parquet` t," +
+        " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) 
d1," +
+        " unnest(d1.items) t3(items) where t.c_id > 1";
+
+    String baselineQuery = "select t.c_name, t3.orders.items as items0, 
t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t " +
+        " inner join (select c_name, f, flatten(t1.f.items) from (select 
c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as 
t1 ) " +
+        "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1";
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .sqlBaselineQuery(baselineQuery)
+        .go();
+  }
+
+  @Test
+  @Ignore ()
+  public void testUnnestWithAggInSubquery() throws Exception {
+    String Sql = "select t.c_name, t3.items from 
cp.`lateraljoin/nested-customer.parquet` t," +
+        " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) 
d1," +
+        " lateral (select avg(t3.items.i_number) from unnest(d1.items) 
t3(items)) where t.c_id > 1";
+
+    String baselineQuery = "select t.c_name, avg(t3.items.i_number) from 
cp.`lateraljoin/nested-customer.parquet` t " +
+        " inner join (select c_name, f, flatten(t1.f.items) from (select 
c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as 
t1 ) " +
+        "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1 group 
by t.c_name";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client
+          .queryBuilder()
+          .sql(Sql)
+          .run();
+    }
+  }
+
+  @Test
+  public void testUnnestWithAggOnOuterTable() throws Exception {
+    String Sql = "select avg(d2.inum) from 
cp.`lateraljoin/nested-customer.parquet` t," +
+        " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) 
d1," +
+        " lateral (select t3.items.i_number as inum from unnest(d1.items) 
t3(items)) d2 where t.c_id > 1 group by t.c_id";
+
+    String baselineQuery = "select avg(t3.items.i_number) from 
cp.`lateraljoin/nested-customer.parquet` t " +
+        " inner join (select c_name, f, flatten(t1.f.items) from (select 
c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as 
t1 ) " +
+        "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1 group 
by t.c_id";
+
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .sqlBaselineQuery(baselineQuery)
+        .go();
+  }
+
+  @Test
+  public void testUnnestTableAndColumnAlias() throws Exception {
+    String Sql = "select t.c_name from cp.`lateraljoin/nested-customer.json` 
t, unnest(t.orders) ";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client
+          .queryBuilder()
+          .sql(Sql)
+          .run();
+    } catch (UserRemoteException ex) {
+      assert(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
+    }
+  }
+
+  @Test
+  public void testUnnestColumnAlias() throws Exception {
+    String Sql = "select t.c_name, t2.orders from 
cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) t2";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client
+          .queryBuilder()
+          .sql(Sql)
+          .run();
+    } catch (UserRemoteException ex) {
+      assert(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
+    }
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index 8ec0c96..137966b 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -42,7 +42,6 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -133,7 +132,6 @@ import static org.junit.Assert.assertTrue;
   }
 
   @Test
-  @Ignore("With DRILL-6321 commits, Unnest's output could be multiplec olumns")
   public void testUnnestMapColumn() {
 
     Object[][] data = getMapData();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 70a32f8..9318c51 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -28,10 +28,13 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.join.LateralJoinBatch;
+import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.planner.logical.DrillLogicalTestutils;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
@@ -466,7 +469,6 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
 
   }
 
-
   @Test
   public void testUnnestNonArrayColumn() {
 
@@ -555,8 +557,15 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
     final UnnestRecordBatch unnestBatch =
         new UnnestRecordBatch(unnestPopConfig, fixture.getFragmentContext());
 
+    // project is required to rename the columns so as to disambiguate the 
same column name from
+    // unnest operator and the regular scan.
+    final Project projectPopConfig = new 
Project(DrillLogicalTestutils.parseExprs("unnestColumn", "unnestColumn1"), 
null);
+
+    final ProjectRecordBatch projectBatch =
+        new ProjectRecordBatch( projectPopConfig, unnestBatch, 
fixture.getFragmentContext());
+
     final LateralJoinBatch lateralJoinBatch =
-        new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), 
incomingMockBatch, unnestBatch);
+        new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), 
incomingMockBatch, projectBatch);
 
     // set pointer to Lateral in unnest
     unnestBatch.setIncoming((LateralContract) lateralJoinBatch);
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index 66354ff..b8a219b 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -21,9 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.antlr.runtime.ANTLRStringStream;
-import org.antlr.runtime.CommonTokenStream;
-import org.antlr.runtime.RecognitionException;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -34,6 +31,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.FragmentStats;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.logical.DrillLogicalTestutils;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
@@ -46,12 +44,8 @@ import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DrillTestWrapper;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.parser.ExprLexer;
-import org.apache.drill.common.expression.parser.ExprParser;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.logical.data.Order;
@@ -131,33 +125,19 @@ public class PhysicalOpUnitTestBase extends ExecTest {
 
   @Override
   protected LogicalExpression parseExpr(String expr) {
-    ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
-    CommonTokenStream tokens = new CommonTokenStream(lexer);
-    ExprParser parser = new ExprParser(tokens);
-    try {
-      return parser.parse().e;
-    } catch (RecognitionException e) {
-      throw new RuntimeException("Error parsing expression: " + expr);
-    }
+    return DrillLogicalTestutils.parseExpr(expr);
   }
 
   protected Order.Ordering ordering(String expression, 
RelFieldCollation.Direction direction, RelFieldCollation.NullDirection 
nullDirection) {
-    return new Order.Ordering(direction, parseExpr(expression), nullDirection);
+    return DrillLogicalTestutils.ordering(expression, direction, 
nullDirection);
   }
 
   protected JoinCondition joinCond(String leftExpr, String relationship, 
String rightExpr) {
-    return new JoinCondition(relationship, parseExpr(leftExpr), 
parseExpr(rightExpr));
+    return DrillLogicalTestutils.joinCond(leftExpr, relationship, rightExpr);
   }
 
   protected List<NamedExpression> parseExprs(String... 
expressionsAndOutputNames) {
-    Preconditions.checkArgument(expressionsAndOutputNames.length %2 ==0, "List 
of expressions and output field names" +
-        " is not complete, each expression must explicitly give and output 
name,");
-    List<NamedExpression> ret = new ArrayList<>();
-    for (int i = 0; i < expressionsAndOutputNames.length; i += 2) {
-      ret.add(new NamedExpression(parseExpr(expressionsAndOutputNames[i]),
-          new FieldReference(new SchemaPath(new 
PathSegment.NameSegment(expressionsAndOutputNames[i+1])))));
-    }
-    return ret;
+    return DrillLogicalTestutils.parseExprs(expressionsAndOutputNames);
   }
 
   protected static class BatchIterator implements Iterable<VectorAccessible> {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillLogicalTestutils.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillLogicalTestutils.java
new file mode 100644
index 0000000..a8927bc
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/DrillLogicalTestutils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import com.google.common.base.Preconditions;
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.RecognitionException;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.parser.ExprLexer;
+import org.apache.drill.common.expression.parser.ExprParser;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Order;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DrillLogicalTestutils {
+  public static Order.Ordering ordering(String expression,
+                                        RelFieldCollation.Direction direction,
+                                        RelFieldCollation.NullDirection 
nullDirection) {
+    return new Order.Ordering(direction, parseExpr(expression), nullDirection);
+  }
+
+  public static JoinCondition joinCond(String leftExpr, String relationship, 
String rightExpr) {
+    return new JoinCondition(relationship, parseExpr(leftExpr), 
parseExpr(rightExpr));
+  }
+
+  public static List<NamedExpression> parseExprs(String... 
expressionsAndOutputNames) {
+    Preconditions.checkArgument(expressionsAndOutputNames.length % 2 == 0,
+      "List of expressions and output field names" + " is not complete, each 
expression must explicitly give and output name,");
+    List<NamedExpression> ret = new ArrayList<>();
+    for (int i = 0; i < expressionsAndOutputNames.length; i += 2) {
+      ret.add(new NamedExpression(parseExpr(expressionsAndOutputNames[i]),
+        new FieldReference(new SchemaPath(new 
PathSegment.NameSegment(expressionsAndOutputNames[i + 1])))));
+    }
+    return ret;
+  }
+
+  public static LogicalExpression parseExpr(String expr) {
+    ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
+    CommonTokenStream tokens = new CommonTokenStream(lexer);
+    ExprParser parser = new ExprParser(tokens);
+    try {
+      return parser.parse().e;
+    } catch (RecognitionException e) {
+      throw new RuntimeException("Error parsing expression: " + expr);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 57bc79c..0dfc1f7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -515,6 +515,15 @@ public class DrillTestWrapper {
       addTypeInfoIfMissing(actual.get(0), testBuilder);
       addToMaterializedResults(actualRecords, actual, loader);
 
+      // If actual result record number is 0,
+      // and the baseline records does not exist, and baselineColumns provided,
+      // compare actual column number/names with expected columns
+      if (actualRecords.size() == 0
+          && (baselineRecords == null || baselineRecords.size()==0)
+          && baselineColumns != null) {
+        checkColumnDef(loader.getSchema());
+      }
+
       // If baseline data was not provided to the test builder directly, we 
must run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
@@ -531,6 +540,18 @@ public class DrillTestWrapper {
     }
   }
 
+  public void checkColumnDef(BatchSchema batchSchema) throws Exception{
+    assert (batchSchema != null && 
batchSchema.getFieldCount()==baselineColumns.length);
+    for (int i=0; i<baselineColumns.length; ++i) {
+      if (!SchemaPath.parseFromString(baselineColumns[i]).equals(
+          SchemaPath.parseFromString(batchSchema.getColumn(i).getName()))) {
+        throw new Exception(i + "the expected column name is not matching: "
+            + baselineColumns[i] + " is not " +
+            batchSchema.getColumn(i).getName());
+      }
+    }
+  }
+
   /**
    * Use this method only if necessary to validate one query against another. 
If you are just validating against a
    * baseline file use one of the simpler interfaces that will write the 
validation query for you.
diff --git a/pom.xml b/pom.xml
index 620f73c..206af7e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
     <dep.guava.version>18.0</dep.guava.version>
     <forkCount>2</forkCount>
     <parquet.version>1.8.1-drill-r0</parquet.version>
-    <calcite.version>1.16.0-drill-r1</calcite.version>
+    <calcite.version>1.16.0-drill-r3</calcite.version>
     <avatica.version>1.11.0</avatica.version>
     <janino.version>2.7.6</janino.version>
     <sqlline.version>1.1.9-drill-r7</sqlline.version>

-- 
To stop receiving notification emails like this one, please contact
par...@apache.org.

Reply via email to