This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit fde66d776cb16e77c1b3436c8a21364a43fb5f8f Author: Hanumath Rao Maduri <hanu....@gmail.com> AuthorDate: Wed May 30 17:59:51 2018 -0700 DRILL-6456: Planner shouldn't create any exchanges on the right side of Lateral Join. This closes #1299 --- .../visitor/ExcessiveExchangeIdentifier.java | 72 ++++++++- .../impl/lateraljoin/TestLateralPlans.java | 164 ++++++++++++++++++++- 2 files changed, 227 insertions(+), 9 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java index 7bfe214..b4ed5e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java @@ -19,8 +19,8 @@ package org.apache.drill.exec.planner.physical.visitor; import java.util.Collections; import java.util.List; - import org.apache.drill.exec.planner.fragment.DistributionAffinity; +import org.apache.drill.exec.planner.physical.CorrelatePrel; import org.apache.drill.exec.planner.physical.ExchangePrel; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.ScanPrel; @@ -28,9 +28,11 @@ import org.apache.drill.exec.planner.physical.ScreenPrel; import org.apache.calcite.rel.RelNode; import com.google.common.collect.Lists; +import org.apache.drill.exec.planner.physical.UnnestPrel; public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, ExcessiveExchangeIdentifier.MajorFragmentStat, RuntimeException> { private final long targetSliceSize; + private CorrelatePrel topMostLateralJoin = null; public ExcessiveExchangeIdentifier(long targetSliceSize) { this.targetSliceSize = targetSliceSize; @@ -45,18 +47,28 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive public Prel visitExchange(ExchangePrel prel, MajorFragmentStat parent) throws RuntimeException { parent.add(prel); MajorFragmentStat newFrag = new MajorFragmentStat(); + newFrag.setRightSideOfLateral(parent.isRightSideOfLateral()); Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag); - - if (newFrag.isSingular() && parent.isSingular() && - // if one of them has strict distribution or none, we can remove the exchange - (!newFrag.isDistributionStrict() || !parent.isDistributionStrict()) - ) { + if (canRemoveExchange(parent, newFrag)) { return newChild; } else { return (Prel) prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode) newChild)); } } + private boolean canRemoveExchange(MajorFragmentStat parentFrag, MajorFragmentStat childFrag) { + if (childFrag.isSingular() && parentFrag.isSingular() && + (!childFrag.isDistributionStrict() || !parentFrag.isDistributionStrict())) { + return true; + } + + if (parentFrag.isRightSideOfLateral()) { + return true; + } + + return false; + } + @Override public Prel visitScreen(ScreenPrel prel, MajorFragmentStat s) throws RuntimeException { s.addScreen(prel); @@ -71,6 +83,40 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive } @Override + public Prel visitCorrelate(CorrelatePrel prel, MajorFragmentStat s) throws RuntimeException { + List<RelNode> children = Lists.newArrayList(); + s.add(prel); + + for (Prel p : prel) { + s.add(p); + } + + // Traverse the left side of the Lateral join first. Left side of the + // Lateral shouldn't have any restrictions on Exchanges. + children.add(((Prel)prel.getInput(0)).accept(this, s)); + // Save the outermost Lateral join so as to unset the flag later. + if (topMostLateralJoin == null) { + topMostLateralJoin = prel; + } + + // Right side of the Lateral shouldn't have any Exchanges. Hence set the + // flag so that visitExchange removes the exchanges. + s.setRightSideOfLateral(true); + children.add(((Prel)prel.getInput(1)).accept(this, s)); + if (topMostLateralJoin == prel) { + topMostLateralJoin = null; + s.setRightSideOfLateral(false); + } + return (Prel) prel.copy(prel.getTraitSet(), children); + } + + @Override + public Prel visitUnnest(UnnestPrel prel, MajorFragmentStat s) throws RuntimeException { + s.addUnnest(prel); + return prel; + } + + @Override public Prel visitPrel(Prel prel, MajorFragmentStat s) throws RuntimeException { List<RelNode> children = Lists.newArrayList(); s.add(prel); @@ -98,6 +144,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive private double maxRows = 0d; private int maxWidth = Integer.MAX_VALUE; private boolean isMultiSubScan = false; + private boolean rightSideOfLateral = false; public void add(Prel prel) { maxRows = Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows); @@ -130,9 +177,20 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive return w == 1; } + public boolean isRightSideOfLateral() { + return this.rightSideOfLateral; + } + + public void addUnnest(UnnestPrel prel) { + add(prel); + } + + public void setRightSideOfLateral(boolean rightSideOfLateral) { + this.rightSideOfLateral = rightSideOfLateral; + } + public boolean isDistributionStrict() { return distributionAffinity == DistributionAffinity.HARD; } } - } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java index 9e19729..00ab971 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java @@ -18,10 +18,13 @@ package org.apache.drill.exec.physical.impl.lateraljoin; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.apache.drill.PlanTestBase; import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.test.ClientFixture; import org.apache.drill.test.ClusterFixture; @@ -30,10 +33,18 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.Ignore; +import java.nio.file.Paths; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + public class TestLateralPlans extends BaseTestQuery { + private static final String regularTestFile_1 = "cust_order_10_1.json"; + private static final String regularTestFile_2 = "cust_order_10_2.json"; @BeforeClass public static void enableUnnestLateral() throws Exception { + dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_1)); + dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_2)); test("alter session set `planner.enable_unnest_lateral`=true"); } @@ -255,7 +266,7 @@ public class TestLateralPlans extends BaseTestQuery { .sql(Sql) .run(); } catch (UserRemoteException ex) { - assert(ex.getMessage().contains("Alias table and column name are required for UNNEST")); + assertTrue(ex.getMessage().contains("Alias table and column name are required for UNNEST")); } } @@ -272,7 +283,156 @@ public class TestLateralPlans extends BaseTestQuery { .sql(Sql) .run(); } catch (UserRemoteException ex) { - assert(ex.getMessage().contains("Alias table and column name are required for UNNEST")); + assertTrue(ex.getMessage().contains("Alias table and column name are required for UNNEST")); + } + } + + /*********************************************************************************************** + Following test cases are introduced to make sure no exchanges are present on right side of + Lateral join. + **********************************************************************************************/ + + @Test + public void testNoExchangeWithAggWithoutGrpBy() throws Exception { + String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," + + " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord)) d1"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String explain = client.queryBuilder().sql(Sql).explainText(); + String rightChild = getRightChildOfLateral(explain); + assertFalse(rightChild.contains("Exchange")); + } + } + + @Test + public void testNoExchangeWithStreamAggWithGrpBy() throws Exception { + String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," + + " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1) + .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false) + .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String explain = client.queryBuilder().sql(Sql).explainText(); + String rightChild = getRightChildOfLateral(explain); + assertFalse(rightChild.contains("Exchange")); } } + + @Test + public void testNoExchangeWithHashAggWithGrpBy() throws Exception { + String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," + + " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1) + .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), true) + .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), false); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String explain = client.queryBuilder().sql(Sql).explainText(); + String rightChild = getRightChildOfLateral(explain); + assertFalse(rightChild.contains("Exchange")); + } + } + + @Test + public void testNoExchangeWithOrderByWithoutLimit() throws Exception { + String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," + + " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey) d1"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String explain = client.queryBuilder().sql(Sql).explainText(); + String rightChild = getRightChildOfLateral(explain); + assertFalse(rightChild.contains("Exchange")); + } + } + + @Test + public void testNoExchangeWithOrderByLimit() throws Exception { + String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," + + " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String explain = client.queryBuilder().sql(Sql).explainText(); + String rightChild = getRightChildOfLateral(explain); + assertFalse(rightChild.contains("Exchange")); + } + } + + + @Test + public void testNoExchangeWithLateralsDownStreamJoin() throws Exception { + String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t, dfs.`lateraljoin/multipleFiles` t2, " + + " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1" + + " where t.c_name = t2.c_name"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String explain = client.queryBuilder().sql(Sql).explainText(); + String rightChild = getRightChildOfLateral(explain); + assertFalse(rightChild.contains("Exchange")); + } + } + + @Test + public void testNoExchangeWithLateralsDownStreamUnion() throws Exception { + String Sql = "select t.c_name from dfs.`lateraljoin/multipleFiles` t union all " + + " select t.c_name from dfs.`lateraljoin/multipleFiles` t, " + + " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String explain = client.queryBuilder().sql(Sql).explainText(); + String rightChild = getRightChildOfLateral(explain); + assertFalse(rightChild.contains("Exchange")); + } + } + + @Test + public void testNoExchangeWithLateralsDownStreamAgg() throws Exception { + String Sql = "select sum(d1.totalprice) from dfs.`lateraljoin/multipleFiles` t, " + + " lateral ( select t2.ord.o_totalprice as totalprice from unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1 group by t.c_custkey"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1) + .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false) + .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String explain = client.queryBuilder().sql(Sql).explainText(); + String rightChild = getRightChildOfLateral(explain); + assertFalse(rightChild.contains("Exchange")); + } + } + + private String getRightChildOfLateral(String explain) throws Exception { + Matcher matcher = Pattern.compile("Correlate.*Unnest", Pattern.MULTILINE | Pattern.DOTALL).matcher(explain); + assertTrue (matcher.find()); + String CorrelateUnnest = matcher.group(0); + return CorrelateUnnest.substring(CorrelateUnnest.lastIndexOf("Scan")); + } } -- To stop receiving notification emails like this one, please contact par...@apache.org.