This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit fde66d776cb16e77c1b3436c8a21364a43fb5f8f
Author: Hanumath Rao Maduri <hanu....@gmail.com>
AuthorDate: Wed May 30 17:59:51 2018 -0700

    DRILL-6456: Planner shouldn't create any exchanges on the right side of 
Lateral Join.
    
    This closes #1299
---
 .../visitor/ExcessiveExchangeIdentifier.java       |  72 ++++++++-
 .../impl/lateraljoin/TestLateralPlans.java         | 164 ++++++++++++++++++++-
 2 files changed, 227 insertions(+), 9 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index 7bfe214..b4ed5e0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.planner.physical.visitor;
 
 import java.util.Collections;
 import java.util.List;
-
 import org.apache.drill.exec.planner.fragment.DistributionAffinity;
+import org.apache.drill.exec.planner.physical.CorrelatePrel;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
@@ -28,9 +28,11 @@ import org.apache.drill.exec.planner.physical.ScreenPrel;
 import org.apache.calcite.rel.RelNode;
 
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
 
 public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, 
ExcessiveExchangeIdentifier.MajorFragmentStat, RuntimeException> {
   private final long targetSliceSize;
+  private CorrelatePrel topMostLateralJoin = null;
 
   public ExcessiveExchangeIdentifier(long targetSliceSize) {
     this.targetSliceSize = targetSliceSize;
@@ -45,18 +47,28 @@ public class ExcessiveExchangeIdentifier extends 
BasePrelVisitor<Prel, Excessive
   public Prel visitExchange(ExchangePrel prel, MajorFragmentStat parent) 
throws RuntimeException {
     parent.add(prel);
     MajorFragmentStat newFrag = new MajorFragmentStat();
+    newFrag.setRightSideOfLateral(parent.isRightSideOfLateral());
     Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag);
-
-    if (newFrag.isSingular() && parent.isSingular() &&
-        // if one of them has strict distribution or none, we can remove the 
exchange
-        (!newFrag.isDistributionStrict() || !parent.isDistributionStrict())
-        ) {
+    if (canRemoveExchange(parent, newFrag)) {
       return newChild;
     } else {
       return (Prel) prel.copy(prel.getTraitSet(), 
Collections.singletonList((RelNode) newChild));
     }
   }
 
+  private boolean canRemoveExchange(MajorFragmentStat parentFrag, 
MajorFragmentStat childFrag) {
+    if (childFrag.isSingular() && parentFrag.isSingular() &&
+       (!childFrag.isDistributionStrict() || 
!parentFrag.isDistributionStrict())) {
+      return true;
+    }
+
+    if (parentFrag.isRightSideOfLateral()) {
+      return true;
+    }
+
+    return false;
+  }
+
   @Override
   public Prel visitScreen(ScreenPrel prel, MajorFragmentStat s) throws 
RuntimeException {
     s.addScreen(prel);
@@ -71,6 +83,40 @@ public class ExcessiveExchangeIdentifier extends 
BasePrelVisitor<Prel, Excessive
   }
 
   @Override
+  public Prel visitCorrelate(CorrelatePrel prel, MajorFragmentStat s) throws 
RuntimeException {
+    List<RelNode> children = Lists.newArrayList();
+    s.add(prel);
+
+    for (Prel p : prel) {
+      s.add(p);
+    }
+
+    // Traverse the left side of the Lateral join first. Left side of the
+    // Lateral shouldn't have any restrictions on Exchanges.
+    children.add(((Prel)prel.getInput(0)).accept(this, s));
+    // Save the outermost Lateral join so as to unset the flag later.
+    if (topMostLateralJoin == null) {
+      topMostLateralJoin = prel;
+    }
+
+    // Right side of the Lateral shouldn't have any Exchanges. Hence set the
+    // flag so that visitExchange removes the exchanges.
+    s.setRightSideOfLateral(true);
+    children.add(((Prel)prel.getInput(1)).accept(this, s));
+    if (topMostLateralJoin == prel) {
+      topMostLateralJoin = null;
+      s.setRightSideOfLateral(false);
+    }
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+  @Override
+  public Prel visitUnnest(UnnestPrel prel, MajorFragmentStat s) throws 
RuntimeException {
+    s.addUnnest(prel);
+    return prel;
+  }
+
+  @Override
   public Prel visitPrel(Prel prel, MajorFragmentStat s) throws 
RuntimeException {
     List<RelNode> children = Lists.newArrayList();
     s.add(prel);
@@ -98,6 +144,7 @@ public class ExcessiveExchangeIdentifier extends 
BasePrelVisitor<Prel, Excessive
     private double maxRows = 0d;
     private int maxWidth = Integer.MAX_VALUE;
     private boolean isMultiSubScan = false;
+    private boolean rightSideOfLateral = false;
 
     public void add(Prel prel) {
       maxRows = 
Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows);
@@ -130,9 +177,20 @@ public class ExcessiveExchangeIdentifier extends 
BasePrelVisitor<Prel, Excessive
       return w == 1;
     }
 
+    public boolean isRightSideOfLateral() {
+      return this.rightSideOfLateral;
+    }
+
+    public void addUnnest(UnnestPrel prel) {
+      add(prel);
+    }
+
+    public void setRightSideOfLateral(boolean rightSideOfLateral) {
+      this.rightSideOfLateral = rightSideOfLateral;
+    }
+
     public boolean isDistributionStrict() {
       return distributionAffinity == DistributionAffinity.HARD;
     }
   }
-
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 9e19729..00ab971 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -18,10 +18,13 @@
 package org.apache.drill.exec.physical.impl.lateraljoin;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
@@ -30,10 +33,18 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.Ignore;
 
+import java.nio.file.Paths;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public class TestLateralPlans extends BaseTestQuery {
+  private static final String regularTestFile_1 = "cust_order_10_1.json";
+  private static final String regularTestFile_2 = "cust_order_10_2.json";
 
   @BeforeClass
   public static void enableUnnestLateral() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", 
"multipleFiles", regularTestFile_1));
+    dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", 
"multipleFiles", regularTestFile_2));
     test("alter session set `planner.enable_unnest_lateral`=true");
   }
 
@@ -255,7 +266,7 @@ public class TestLateralPlans extends BaseTestQuery {
           .sql(Sql)
           .run();
     } catch (UserRemoteException ex) {
-      assert(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
+      assertTrue(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
     }
   }
 
@@ -272,7 +283,156 @@ public class TestLateralPlans extends BaseTestQuery {
           .sql(Sql)
           .run();
     } catch (UserRemoteException ex) {
-      assert(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
+      assertTrue(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
+    }
+  }
+
+  
/***********************************************************************************************
+   Following test cases are introduced to make sure no exchanges are present 
on right side of
+   Lateral join.
+   
**********************************************************************************************/
+
+  @Test
+  public void testNoExchangeWithAggWithoutGrpBy() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select sum(t2.ord.o_totalprice) as totalprice from 
unnest(t.c_orders) t2(ord)) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithStreamAggWithGrpBy() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select sum(t2.ord.o_totalprice) as totalprice from 
unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+            .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false)
+            .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
     }
   }
+
+  @Test
+  public void testNoExchangeWithHashAggWithGrpBy() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select sum(t2.ord.o_totalprice) as totalprice from 
unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+            .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), true)
+            .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), 
false);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithOrderByWithoutLimit() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithOrderByLimit() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+
+  @Test
+  public void testNoExchangeWithLateralsDownStreamJoin() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t, 
dfs.`lateraljoin/multipleFiles` t2, " +
+            " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1" +
+            " where t.c_name = t2.c_name";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithLateralsDownStreamUnion() throws Exception {
+    String Sql = "select t.c_name from dfs.`lateraljoin/multipleFiles` t union 
all " +
+            " select t.c_name from dfs.`lateraljoin/multipleFiles` t, " +
+                    " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithLateralsDownStreamAgg() throws Exception {
+    String Sql = "select sum(d1.totalprice) from 
dfs.`lateraljoin/multipleFiles` t, " +
+            " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1 group by 
t.c_custkey";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+            .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false)
+            .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  private String getRightChildOfLateral(String explain) throws Exception {
+    Matcher matcher = Pattern.compile("Correlate.*Unnest", Pattern.MULTILINE | 
Pattern.DOTALL).matcher(explain);
+    assertTrue (matcher.find());
+    String CorrelateUnnest = matcher.group(0);
+    return CorrelateUnnest.substring(CorrelateUnnest.lastIndexOf("Scan"));
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
par...@apache.org.

Reply via email to