PHOENIX-1556 Base hash versus sort merge join decision on cost
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6b40a36b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6b40a36b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6b40a36b Branch: refs/heads/4.x-HBase-1.1 Commit: 6b40a36b51eb92361dc2cfdca291f4391c7fcc01 Parents: 56cdcb9 Author: maryannxue <maryann....@gmail.com> Authored: Mon Feb 12 14:07:30 2018 -0800 Committer: maryannxue <maryann....@gmail.com> Committed: Tue Mar 13 16:48:01 2018 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/CostBasedDecisionIT.java | 420 ++++++++++++----- .../apache/phoenix/compile/JoinCompiler.java | 43 ++ .../phoenix/compile/ListJarsQueryPlan.java | 8 +- .../apache/phoenix/compile/QueryCompiler.java | 449 ++++++++++--------- .../org/apache/phoenix/compile/QueryPlan.java | 2 + .../apache/phoenix/compile/TraceQueryPlan.java | 6 + .../apache/phoenix/execute/AggregatePlan.java | 41 +- .../phoenix/execute/ClientAggregatePlan.java | 46 +- .../phoenix/execute/ClientProcessingPlan.java | 4 + .../apache/phoenix/execute/ClientScanPlan.java | 22 +- .../apache/phoenix/execute/CorrelatePlan.java | 26 +- .../apache/phoenix/execute/CursorFetchPlan.java | 6 + .../apache/phoenix/execute/HashJoinPlan.java | 128 ++++-- .../execute/LiteralResultIterationPlan.java | 6 + .../org/apache/phoenix/execute/ScanPlan.java | 14 +- .../phoenix/execute/SortMergeJoinPlan.java | 20 +- .../phoenix/execute/TupleProjectionPlan.java | 6 + .../org/apache/phoenix/execute/UnionPlan.java | 12 +- .../apache/phoenix/execute/UnnestArrayPlan.java | 6 + .../execute/visitor/AvgRowWidthVisitor.java | 205 +++++++++ .../execute/visitor/ByteCountVisitor.java | 125 ++++++ .../execute/visitor/QueryPlanVisitor.java | 46 ++ .../execute/visitor/RowCountVisitor.java | 335 ++++++++++++++ .../apache/phoenix/jdbc/PhoenixStatement.java | 6 + .../java/org/apache/phoenix/util/CostUtil.java | 61 +-- .../query/ParallelIteratorsSplitTest.java | 6 + 26 files changed, 1612 insertions(+), 437 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java index a3584ce..493855a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java @@ -32,12 +32,16 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; + import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.Maps; public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { + private final String testTable500; + private final String testTable990; + private final String testTable1000; @BeforeClass public static void doSetup() throws Exception { @@ -46,9 +50,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true)); + props.put(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, Long.toString(150000)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + public CostBasedDecisionIT() throws Exception { + testTable500 = initTestTableValues(500); + testTable990 = initTestTableValues(990); + testTable1000 = initTestTableValues(1000); + } + @Test public void testCostOverridesStaticPlanOrdering1() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -64,10 +75,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey"; // Use the data table plan that opts out order-by when stats are not available. - ResultSet rs = conn.createStatement().executeQuery("explain " + query); - String plan = QueryUtil.getExplainPlan(rs); - assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".", - plan.contains("FULL SCAN")); + verifyQueryPlan(query, "FULL SCAN"); PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); for (int i = 0; i < 10000; i++) { @@ -81,10 +89,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPDATE STATISTICS " + tableName); // Use the index table plan that has a lower cost when stats become available. - rs = conn.createStatement().executeQuery("explain " + query); - plan = QueryUtil.getExplainPlan(rs); - assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".", - plan.contains("RANGE SCAN")); + verifyQueryPlan(query, "RANGE SCAN"); } finally { conn.close(); } @@ -103,12 +108,12 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { "c2 VARCHAR)"); conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)"); - String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey"; + String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1"; // Use the index table plan that opts out order-by when stats are not available. - ResultSet rs = conn.createStatement().executeQuery("explain " + query); - String plan = QueryUtil.getExplainPlan(rs); - assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".", - plan.contains("RANGE SCAN")); + verifyQueryPlan(query, + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" + + "CLIENT MERGE SORT"); PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); for (int i = 0; i < 10000; i++) { @@ -124,10 +129,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { // Given that the range on C1 is meaningless and group-by becomes // order-preserving if using the data table, the data table plan should // come out as the best plan based on the costs. - rs = conn.createStatement().executeQuery("explain " + query); - plan = QueryUtil.getExplainPlan(rs); - assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".", - plan.contains("FULL SCAN")); + verifyQueryPlan(query, + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" + + " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" + + "CLIENT MERGE SORT"); } finally { conn.close(); } @@ -150,14 +156,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000"; // Use the idx2 plan with a wider PK slot span when stats are not available. - ResultSet rs = conn.createStatement().executeQuery("explain " + query); - String plan = QueryUtil.getExplainPlan(rs); - String indexPlan = + verifyQueryPlan(query, "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" + " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" + - "CLIENT MERGE SORT"; - assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", - plan.contains(indexPlan)); + "CLIENT MERGE SORT"); PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)"); for (int i = 0; i < 10000; i++) { @@ -171,14 +173,10 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPDATE STATISTICS " + tableName); // Use the idx2 plan that scans less data when stats become available. - rs = conn.createStatement().executeQuery("explain " + query); - plan = QueryUtil.getExplainPlan(rs); - String dataPlan = + verifyQueryPlan(query, "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" + " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" + - "CLIENT MERGE SORT"; - assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".", - plan.contains(dataPlan)); + "CLIENT MERGE SORT"); } finally { conn.close(); } @@ -201,15 +199,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000"; // Use the idx2 plan with a wider PK slot span when stats are not available. - ResultSet rs = conn.createStatement().executeQuery("explain " + query); - String plan = QueryUtil.getExplainPlan(rs); - String indexPlan = + verifyQueryPlan(query, "UPSERT SELECT\n" + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" + " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" + - "CLIENT MERGE SORT"; - assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", - plan.contains(indexPlan)); + "CLIENT MERGE SORT"); PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)"); for (int i = 0; i < 10000; i++) { @@ -223,15 +217,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPDATE STATISTICS " + tableName); // Use the idx2 plan that scans less data when stats become available. - rs = conn.createStatement().executeQuery("explain " + query); - plan = QueryUtil.getExplainPlan(rs); - String dataPlan = + verifyQueryPlan(query, "UPSERT SELECT\n" + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" + " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" + - "CLIENT MERGE SORT"; - assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".", - plan.contains(dataPlan)); + "CLIENT MERGE SORT"); } finally { conn.close(); } @@ -254,15 +244,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000"; // Use the idx2 plan with a wider PK slot span when stats are not available. - ResultSet rs = conn.createStatement().executeQuery("explain " + query); - String plan = QueryUtil.getExplainPlan(rs); - String indexPlan = + verifyQueryPlan(query, "DELETE ROWS\n" + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" + " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" + - "CLIENT MERGE SORT"; - assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", - plan.contains(indexPlan)); + "CLIENT MERGE SORT"); PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)"); for (int i = 0; i < 10000; i++) { @@ -276,15 +262,11 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPDATE STATISTICS " + tableName); // Use the idx2 plan that scans less data when stats become available. - rs = conn.createStatement().executeQuery("explain " + query); - plan = QueryUtil.getExplainPlan(rs); - String dataPlan = + verifyQueryPlan(query, "DELETE ROWS\n" + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" + " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" + - "CLIENT MERGE SORT"; - assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".", - plan.contains(dataPlan)); + "CLIENT MERGE SORT"); } finally { conn.close(); } @@ -303,22 +285,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { "c2 VARCHAR)"); conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)"); - String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 " - + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey"; + String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey <= 'z' GROUP BY c1 " + + "UNION ALL SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey >= 'a' GROUP BY c1"; // Use the default plan when stats are not available. - ResultSet rs = conn.createStatement().executeQuery("explain " + query); - String plan = QueryUtil.getExplainPlan(rs); - String defaultPlan = + verifyQueryPlan(query, "UNION ALL OVER 2 QUERIES\n" + - " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" + " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" + " CLIENT MERGE SORT\n" + - " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" + - " SERVER FILTER BY FIRST KEY ONLY\n" + - " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" + - " CLIENT MERGE SORT"; - assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".", - plan.contains(defaultPlan)); + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['a'] - [*]\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" + + " CLIENT MERGE SORT"); PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); for (int i = 0; i < 10000; i++) { @@ -332,19 +309,16 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPDATE STATISTICS " + tableName); // Use the optimal plan based on cost when stats become available. - rs = conn.createStatement().executeQuery("explain " + query); - plan = QueryUtil.getExplainPlan(rs); - String optimizedPlan = + verifyQueryPlan(query, "UNION ALL OVER 2 QUERIES\n" + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" + - " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" + + " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" + " CLIENT MERGE SORT\n" + - " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" + - " SERVER FILTER BY C1 LIKE 'X%'\n" + - " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]"; - assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".", - plan.contains(optimizedPlan)); + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" + + " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" >= 'a'\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" + + " CLIENT MERGE SORT"); } finally { conn.close(); } @@ -363,23 +337,18 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { "c2 VARCHAR)"); conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)"); - String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 " - + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 " - + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey"; + String query = "SELECT t1.rowkey, t1.c1, t1.c2, t2.c1, mc2 FROM " + tableName + " t1 " + + "JOIN (SELECT c1, max(rowkey) mrk, max(c2) mc2 FROM " + tableName + " where rowkey <= 'z' GROUP BY c1) t2 " + + "ON t1.rowkey = t2.mrk WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey"; // Use the default plan when stats are not available. - ResultSet rs = conn.createStatement().executeQuery("explain " + query); - String plan = QueryUtil.getExplainPlan(rs); - String defaultPlan = + verifyQueryPlan(query, "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" + " SERVER FILTER BY C1 LIKE 'X0%'\n" + " PARALLEL INNER-JOIN TABLE 0\n" + - " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" + - " SERVER FILTER BY FIRST KEY ONLY\n" + - " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [*] - ['z']\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" + " CLIENT MERGE SORT\n" + - " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)"; - assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".", - plan.contains(defaultPlan)); + " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.MRK)"); PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); for (int i = 0; i < 10000; i++) { @@ -393,20 +362,17 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPDATE STATISTICS " + tableName); // Use the optimal plan based on cost when stats become available. - rs = conn.createStatement().executeQuery("explain " + query); - plan = QueryUtil.getExplainPlan(rs); - String optimizedPlan = + verifyQueryPlan(query, "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" + " SERVER FILTER BY FIRST KEY ONLY\n" + " SERVER SORTED BY [\"T1.:ROWKEY\"]\n" + "CLIENT MERGE SORT\n" + " PARALLEL INNER-JOIN TABLE 0\n" + - " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" + - " SERVER FILTER BY C1 LIKE 'X%'\n" + - " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" + - " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)"; - assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".", - plan.contains(optimizedPlan)); + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" + + " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" <= 'z'\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" + + " CLIENT MERGE SORT\n" + + " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.MRK)"); } finally { conn.close(); } @@ -432,10 +398,7 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)"; // Use the index table plan that opts out order-by when stats are not available. - ResultSet rs = conn.createStatement().executeQuery("explain " + query); - String plan = QueryUtil.getExplainPlan(rs); - assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", - plan.contains(indexPlan)); + verifyQueryPlan(query, indexPlan); PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); for (int i = 0; i < 10000; i++) { @@ -449,18 +412,261 @@ public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { conn.createStatement().execute("UPDATE STATISTICS " + tableName); // Use the data table plan that has a lower cost when stats are available. - rs = conn.createStatement().executeQuery("explain " + query); - plan = QueryUtil.getExplainPlan(rs); - assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".", - plan.contains(dataPlan)); + verifyQueryPlan(query, dataPlan); // Use the index table plan as has been hinted. - rs = conn.createStatement().executeQuery("explain " + hintedQuery); - plan = QueryUtil.getExplainPlan(rs); - assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", - plan.contains(indexPlan)); + verifyQueryPlan(hintedQuery, indexPlan); } finally { conn.close(); } } + + /** Sort-merge-join w/ both children ordered wins over hash-join. */ + @Test + public void testJoinStrategy() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" + + "ON t1.ID = t2.ID"; + String expected = + "SORT-MERGE-JOIN (INNER) TABLES\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" + + "AND\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000; + verifyQueryPlan(q, expected); + } + + /** Sort-merge-join w/ both children ordered wins over hash-join in an un-grouped aggregate query. */ + @Test + public void testJoinStrategy2() throws Exception { + String q = "SELECT count(*)\n" + + "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" + + "ON t1.ID = t2.ID\n" + + "WHERE t1.COL1 < 200"; + String expected = + "SORT-MERGE-JOIN (INNER) TABLES\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" + + " SERVER FILTER BY COL1 < 200\n" + + "AND (SKIP MERGE)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + "CLIENT AGGREGATE INTO SINGLE ROW"; + verifyQueryPlan(q, expected); + } + + /** Hash-join w/ PK/FK optimization wins over sort-merge-join w/ larger side ordered. */ + @Test + public void testJoinStrategy3() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" + + "ON t1.COL1 = t2.ID\n" + + "WHERE t1.ID > 200"; + String expected = + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" + + " DYNAMIC SERVER FILTER BY T2.ID IN (T1.COL1)"; + verifyQueryPlan(q, expected); + } + + /** Hash-join w/ PK/FK optimization wins over hash-join w/o PK/FK optimization when two sides are close in size. */ + @Test + public void testJoinStrategy4() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable990 + " t1 JOIN " + testTable1000 + " t2\n" + + "ON t1.ID = t2.COL1"; + String expected = + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)"; + verifyQueryPlan(q, expected); + } + + /** Hash-join wins over sort-merge-join w/ smaller side ordered. */ + @Test + public void testJoinStrategy5() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" + + "ON t1.ID = t2.COL1\n" + + "WHERE t1.ID > 200"; + String expected = + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]"; + verifyQueryPlan(q, expected); + } + + /** Hash-join wins over sort-merge-join w/o any side ordered. */ + @Test + public void testJoinStrategy6() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" + + "ON t1.COL1 = t2.COL1\n" + + "WHERE t1.ID > 200"; + String expected = + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]"; + verifyQueryPlan(q, expected); + } + + /** + * Hash-join wins over sort-merge-join w/ both sides ordered in an order-by query. + * This is because order-by can only be done on the client side after sort-merge-join + * and order-by w/o limit on the client side is very expensive. + */ + @Test + public void testJoinStrategy7() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" + + "ON t1.ID = t2.ID\n" + + "ORDER BY t1.COL1"; + String expected = + "CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " SERVER SORTED BY [T1.COL1]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" + + " DYNAMIC SERVER FILTER BY T2.ID IN (T1.ID)"; + verifyQueryPlan(q, expected); + } + + /** + * Sort-merge-join w/ both sides ordered wins over hash-join in an order-by limit query. + * This is because order-by can only be done on the client side after sort-merge-join + * but order-by w/ limit on the client side is less expensive. + */ + @Test + public void testJoinStrategy8() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable500 + " t1 JOIN " + testTable1000 + " t2\n" + + "ON t1.ID = t2.ID\n" + + "ORDER BY t1.COL1 LIMIT 5"; + String expected = + "SORT-MERGE-JOIN (INNER) TABLES\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable500 + "\n" + + "AND\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + "CLIENT TOP 5 ROWS SORTED BY [T1.COL1]"; + verifyQueryPlan(q, expected); + } + + /** + * Multi-table join: sort-merge-join chosen since all join keys are PK. + */ + @Test + public void testJoinStrategy9() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable1000 + " t1 LEFT JOIN " + testTable500 + " t2\n" + + "ON t1.ID = t2.ID AND t2.ID > 200\n" + + "LEFT JOIN " + testTable990 + " t3\n" + + "ON t1.ID = t3.ID AND t3.ID < 100"; + String expected = + "SORT-MERGE-JOIN (LEFT) TABLES\n" + + " SORT-MERGE-JOIN (LEFT) TABLES\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " AND\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" + + "AND\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]"; + verifyQueryPlan(q, expected); + } + + /** + * Multi-table join: a mix of join strategies chosen based on cost. + */ + @Test + public void testJoinStrategy10() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" + + "ON t1.ID = t2.COL1 AND t2.ID > 200\n" + + "JOIN " + testTable990 + " t3\n" + + "ON t1.ID = t3.ID AND t3.ID < 100"; + String expected = + "SORT-MERGE-JOIN (INNER) TABLES\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" + + " DYNAMIC SERVER FILTER BY T1.ID IN (T2.COL1)\n" + + "AND\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]"; + verifyQueryPlan(q, expected); + } + + /** + * Multi-table join: hash-join two tables in parallel since two RHS tables are both small + * and can fit in memory at the same time. + */ + @Test + public void testJoinStrategy11() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable1000 + " t1 JOIN " + testTable500 + " t2\n" + + "ON t1.COL2 = t2.COL1 AND t2.ID > 200\n" + + "JOIN " + testTable990 + " t3\n" + + "ON t1.COL1 = t3.COL2 AND t3.ID < 100"; + String expected = + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable500 + " [201] - [*]\n" + + " PARALLEL INNER-JOIN TABLE 1\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + testTable990 + " [*] - [100]"; + verifyQueryPlan(q, expected); + } + + /** + * Multi-table join: similar to {@link this#testJoinStrategy11()}, but the two RHS + * tables cannot fit in memory at the same time, and thus a mix of join strategies + * is chosen based on cost. + */ + @Test + public void testJoinStrategy12() throws Exception { + String q = "SELECT *\n" + + "FROM " + testTable1000 + " t1 JOIN " + testTable990 + " t2\n" + + "ON t1.COL2 = t2.COL1\n" + + "JOIN " + testTable990 + " t3\n" + + "ON t1.COL1 = t3.COL2"; + String expected = + "SORT-MERGE-JOIN (INNER) TABLES\n" + + " CLIENT PARALLEL 1001-WAY FULL SCAN OVER " + testTable1000 + "\n" + + " SERVER SORTED BY [T1.COL1]\n" + + " CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + testTable990 + "\n" + + "AND\n" + + " CLIENT PARALLEL 991-WAY FULL SCAN OVER " + testTable990 + "\n" + + " SERVER SORTED BY [T3.COL2]\n" + + " CLIENT MERGE SORT"; + verifyQueryPlan(q, expected); + } + + private static void verifyQueryPlan(String query, String expected) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + assertTrue("Expected '" + expected + "' in the plan:\n" + plan + ".", + plan.contains(expected)); + } + + private static String initTestTableValues(int rows) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String tableName = generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "ID INTEGER NOT NULL PRIMARY KEY,\n" + + "COL1 INTEGER," + + "COL2 INTEGER)"); + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + tableName + " VALUES(?, ?, ?)"); + for (int i = 0; i < rows; i++) { + stmt.setInt(1, i + 1); + stmt.setInt(2, rows - i); + stmt.setInt(3, rows + i); + stmt.execute(); + } + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + return tableName; + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index f3c4c24..f5a7e39 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -71,6 +71,8 @@ import org.apache.phoenix.parse.TableNodeVisitor; import org.apache.phoenix.parse.TableWildcardParseNode; import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.parse.WildcardParseNode; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.LocalIndexDataColumnRef; @@ -124,6 +126,8 @@ public class JoinCompiler { private final ColumnResolver origResolver; private final boolean useStarJoin; private final Map<ColumnRef, ColumnRefType> columnRefs; + private final boolean useSortMergeJoin; + private final boolean costBased; private JoinCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) { @@ -132,6 +136,9 @@ public class JoinCompiler { this.origResolver = resolver; this.useStarJoin = !select.getHint().hasHint(Hint.NO_STAR_JOIN); this.columnRefs = new HashMap<ColumnRef, ColumnRefType>(); + this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN); + this.costBased = statement.getConnection().getQueryServices().getProps().getBoolean( + QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED); } public static JoinTable compile(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException { @@ -365,6 +372,42 @@ public class JoinCompiler { } /** + * Return a list of all applicable join strategies. The order of the strategies in the + * returned list is based on the static rule below. However, the caller can decide on + * an optimal join strategy by evaluating and comparing the costs. + * 1. If hint USE_SORT_MERGE_JOIN is specified, + * return a singleton list containing only SORT_MERGE. + * 2. If 1) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B"; or + * 2) matches pattern "A LEFT/INNER/SEMI/ANTI JOIN B (LEFT/INNER/SEMI/ANTI JOIN C)+" + * and hint NO_STAR_JOIN is not specified, + * add BUILD_RIGHT to the returned list. + * 3. If matches pattern "A RIGHT/INNER JOIN B", where B is either a named table reference + * or a flat sub-query, + * add BUILD_LEFT to the returned list. + * 4. add SORT_MERGE to the returned list. + */ + public List<Strategy> getApplicableJoinStrategies() { + List<Strategy> strategies = Lists.newArrayList(); + if (useSortMergeJoin) { + strategies.add(Strategy.SORT_MERGE); + } else { + if (getStarJoinVector() != null) { + strategies.add(Strategy.HASH_BUILD_RIGHT); + } + JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1); + JoinType type = lastJoinSpec.getType(); + if ((type == JoinType.Right || type == JoinType.Inner) + && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty() + && lastJoinSpec.getJoinTable().getTable().isFlat()) { + strategies.add(Strategy.HASH_BUILD_LEFT); + } + strategies.add(Strategy.SORT_MERGE); + } + + return strategies; + } + + /** * Returns a boolean vector indicating whether the evaluation of join expressions * can be evaluated at an early stage if the input JoinSpec can be taken as a * star join. Otherwise returns null. http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index 0688b94..e3ed110 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; @@ -251,7 +252,12 @@ public class ListJarsQueryPlan implements QueryPlan { return false; } - @Override + @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override public Set<TableRef> getSourceRefs() { return Collections.<TableRef>emptySet(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 3b14850..243f03e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import org.apache.hadoop.hbase.client.Query; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; @@ -52,6 +53,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.EqualParseNode; import org.apache.phoenix.parse.HintNode.Hint; @@ -63,7 +65,10 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.SubqueryParseNode; import org.apache.phoenix.parse.TableNode; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PDatum; @@ -102,9 +107,9 @@ public class QueryCompiler { private final ParallelIteratorFactory parallelIteratorFactory; private final SequenceManager sequenceManager; private final boolean projectTuples; - private final boolean useSortMergeJoin; private final boolean noChildParentJoinOptimization; private final QueryPlan dataPlan; + private final boolean costBased; public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException { this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan); @@ -119,9 +124,10 @@ public class QueryCompiler { this.parallelIteratorFactory = parallelIteratorFactory; this.sequenceManager = sequenceManager; this.projectTuples = projectTuples; - this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN); this.noChildParentJoinOptimization = select.getHint().hasHint(Hint.NO_CHILD_PARENT_JOIN_OPTIMIZATION); - if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) { + ConnectionQueryServices services = statement.getConnection().getQueryServices(); + this.costBased = services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED); + if (services.getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) { this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE); } if (select.getHint().hasHint(Hint.NO_CACHE)) { @@ -201,41 +207,17 @@ public class QueryCompiler { } } - /* + /** * Call compileJoinQuery() for join queries recursively down to the leaf JoinTable nodes. - * This matches the input JoinTable node against patterns in the following order: - * 1. A (leaf JoinTable node, which can be a named table reference or a subquery of any kind.) - * Returns the compilation result of a single table scan or of an independent subquery. - * 2. Matching either of (when hint USE_SORT_MERGE_JOIN not specified): - * 1) A LEFT/INNER JOIN B - * 2) A LEFT/INNER JOIN B (LEFT/INNER JOIN C)+, if hint NO_STAR_JOIN not specified - * where A can be a named table reference or a flat subquery, and B, C, ... can be a named - * table reference, a sub-join or a subquery of any kind. - * Returns a HashJoinPlan{scan: A, hash: B, C, ...}. - * 3. Matching pattern: - * A RIGHT/INNER JOIN B (when hint USE_SORT_MERGE_JOIN not specified) - * where B can be a named table reference or a flat subquery, and A can be a named table - * reference, a sub-join or a subquery of any kind. - * Returns a HashJoinPlan{scan: B, hash: A}. - * NOTE that "A LEFT/RIGHT/INNER/FULL JOIN B RIGHT/INNER JOIN C" is viewed as - * "(A LEFT/RIGHT/INNER/FULL JOIN B) RIGHT/INNER JOIN C" here, which means the left part in the - * parenthesis is considered a sub-join. - * viewed as a sub-join. - * 4. All the rest that do not qualify for previous patterns or conditions, including FULL joins. - * Returns a SortMergeJoinPlan, the sorting part of which is pushed down to the JoinTable nodes - * of both sides as order-by clauses. - * NOTE that SEMI or ANTI joins are treated the same way as LEFT joins in JoinTable pattern matching. - * - * If no join algorithm hint is provided, according to the above compilation process, a join query - * plan can probably consist of both HashJoinPlan and SortMergeJoinPlan which may enclose each other. - * TODO 1) Use table statistics to guide the choice of join plans. - * 2) Make it possible to hint a certain join algorithm for a specific join step. + * If it is a leaf node, call compileSingleFlatQuery() or compileSubquery(), otherwise: + * 1) If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, return the + * join plan with the best cost. Note that the "best" plan is only locally optimal, + * and might or might not be globally optimal. + * 2) Otherwise, return the join plan compiled with the default strategy. + * @see JoinCompiler.JoinTable#getApplicableJoinStrategies() */ - @SuppressWarnings("unchecked") protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException { - byte[] emptyByteArray = new byte[0]; - List<JoinSpec> joinSpecs = joinTable.getJoinSpecs(); - if (joinSpecs.isEmpty()) { + if (joinTable.getJoinSpecs().isEmpty()) { Table table = joinTable.getTable(); SelectStatement subquery = table.getAsSubquery(orderBy); if (!table.isSubselect()) { @@ -253,198 +235,229 @@ public class QueryCompiler { return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), table.compilePostFilterExpression(context)); } - boolean[] starJoinVector; - if (!this.useSortMergeJoin && (starJoinVector = joinTable.getStarJoinVector()) != null) { - Table table = joinTable.getTable(); - PTable initialProjectedTable; - TableRef tableRef; - SelectStatement query; - TupleProjector tupleProjector; - if (!table.isSubselect()) { - context.setCurrentTable(table.getTableRef()); - initialProjectedTable = table.createProjectedTable(!projectPKColumns, context); - tableRef = table.getTableRef(); - table.projectColumns(context.getScan()); - query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery); - tupleProjector = new TupleProjector(initialProjectedTable); - } else { - SelectStatement subquery = table.getAsSubquery(orderBy); - QueryPlan plan = compileSubquery(subquery, false); - initialProjectedTable = table.createProjectedTable(plan.getProjector()); - tableRef = plan.getTableRef(); - context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap()); - query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery); - tupleProjector = new TupleProjector(plan.getProjector()); + List<JoinCompiler.Strategy> strategies = joinTable.getApplicableJoinStrategies(); + assert strategies.size() > 0; + if (!costBased || strategies.size() == 1) { + return compileJoinQuery( + strategies.get(0), context, binds, joinTable, asSubquery, projectPKColumns, orderBy); + } + + QueryPlan bestPlan = null; + Cost bestCost = null; + for (JoinCompiler.Strategy strategy : strategies) { + StatementContext newContext = new StatementContext( + context.getStatement(), context.getResolver(), new Scan(), context.getSequenceManager()); + QueryPlan plan = compileJoinQuery( + strategy, newContext, binds, joinTable, asSubquery, projectPKColumns, orderBy); + Cost cost = plan.getCost(); + if (bestPlan == null || cost.compareTo(bestCost) < 0) { + bestPlan = plan; + bestCost = cost; } - context.setCurrentTable(tableRef); - PTable projectedTable = initialProjectedTable; - int count = joinSpecs.size(); - ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count]; - List<Expression>[] joinExpressions = new List[count]; - JoinType[] joinTypes = new JoinType[count]; - PTable[] tables = new PTable[count]; - int[] fieldPositions = new int[count]; - StatementContext[] subContexts = new StatementContext[count]; - QueryPlan[] subPlans = new QueryPlan[count]; - HashSubPlan[] hashPlans = new HashSubPlan[count]; - fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size(); - for (int i = 0; i < count; i++) { - JoinSpec joinSpec = joinSpecs.get(i); - Scan subScan = ScanUtil.newScan(originalScan); - subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); - subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null); - boolean hasPostReference = joinSpec.getJoinTable().hasPostReference(); - if (hasPostReference) { - tables[i] = subContexts[i].getResolver().getTables().get(0).getTable(); - projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType()); + } + context.setResolver(bestPlan.getContext().getResolver()); + context.setCurrentTable(bestPlan.getContext().getCurrentTable()); + return bestPlan; + } + + protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException { + byte[] emptyByteArray = new byte[0]; + List<JoinSpec> joinSpecs = joinTable.getJoinSpecs(); + switch (strategy) { + case HASH_BUILD_RIGHT: { + boolean[] starJoinVector = joinTable.getStarJoinVector(); + Table table = joinTable.getTable(); + PTable initialProjectedTable; + TableRef tableRef; + SelectStatement query; + TupleProjector tupleProjector; + if (!table.isSubselect()) { + context.setCurrentTable(table.getTableRef()); + initialProjectedTable = table.createProjectedTable(!projectPKColumns, context); + tableRef = table.getTableRef(); + table.projectColumns(context.getScan()); + query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery); + tupleProjector = new TupleProjector(initialProjectedTable); } else { - tables[i] = null; + SelectStatement subquery = table.getAsSubquery(orderBy); + QueryPlan plan = compileSubquery(subquery, false); + initialProjectedTable = table.createProjectedTable(plan.getProjector()); + tableRef = plan.getTableRef(); + context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap()); + query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery); + tupleProjector = new TupleProjector(plan.getProjector()); } - } - for (int i = 0; i < count; i++) { - JoinSpec joinSpec = joinSpecs.get(i); - context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes())); - joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder - Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT); - joinExpressions[i] = joinConditions.getFirst(); - List<Expression> hashExpressions = joinConditions.getSecond(); - Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); - boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions); - Expression keyRangeLhsExpression = keyRangeExpressions.getFirst(); - Expression keyRangeRhsExpression = keyRangeExpressions.getSecond(); - joinTypes[i] = joinSpec.getType(); - if (i < count - 1) { - fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size())); + context.setCurrentTable(tableRef); + PTable projectedTable = initialProjectedTable; + int count = joinSpecs.size(); + ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count]; + List<Expression>[] joinExpressions = new List[count]; + JoinType[] joinTypes = new JoinType[count]; + PTable[] tables = new PTable[count]; + int[] fieldPositions = new int[count]; + StatementContext[] subContexts = new StatementContext[count]; + QueryPlan[] subPlans = new QueryPlan[count]; + HashSubPlan[] hashPlans = new HashSubPlan[count]; + fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size(); + for (int i = 0; i < count; i++) { + JoinSpec joinSpec = joinSpecs.get(i); + Scan subScan = ScanUtil.newScan(originalScan); + subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); + subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null); + boolean hasPostReference = joinSpec.getJoinTable().hasPostReference(); + if (hasPostReference) { + tables[i] = subContexts[i].getResolver().getTables().get(0).getTable(); + projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType()); + } else { + tables[i] = null; + } + } + for (int i = 0; i < count; i++) { + JoinSpec joinSpec = joinSpecs.get(i); + context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes())); + joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder + Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], strategy); + joinExpressions[i] = joinConditions.getFirst(); + List<Expression> hashExpressions = joinConditions.getSecond(); + Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); + boolean optimized = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions); + Expression keyRangeLhsExpression = keyRangeExpressions.getFirst(); + Expression keyRangeRhsExpression = keyRangeExpressions.getSecond(); + joinTypes[i] = joinSpec.getType(); + if (i < count - 1) { + fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size())); + } + hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression); + } + TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); + QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true); + Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table); + Integer limit = null; + Integer offset = null; + if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) { + limit = plan.getLimit(); + offset = plan.getOffset(); } - hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression); + HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes, + starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset)); + return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans); } - TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); - QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true); - Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table); - Integer limit = null; - Integer offset = null; - if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) { - limit = plan.getLimit(); - offset = plan.getOffset(); + case HASH_BUILD_LEFT: { + JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1); + JoinType type = lastJoinSpec.getType(); + JoinTable rhsJoinTable = lastJoinSpec.getJoinTable(); + Table rhsTable = rhsJoinTable.getTable(); + JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); + Scan subScan = ScanUtil.newScan(originalScan); + StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); + QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null); + PTable rhsProjTable; + TableRef rhsTableRef; + SelectStatement rhs; + TupleProjector tupleProjector; + if (!rhsTable.isSubselect()) { + context.setCurrentTable(rhsTable.getTableRef()); + rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context); + rhsTableRef = rhsTable.getTableRef(); + rhsTable.projectColumns(context.getScan()); + rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery); + tupleProjector = new TupleProjector(rhsProjTable); + } else { + SelectStatement subquery = rhsTable.getAsSubquery(orderBy); + QueryPlan plan = compileSubquery(subquery, false); + rhsProjTable = rhsTable.createProjectedTable(plan.getProjector()); + rhsTableRef = plan.getTableRef(); + context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap()); + rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery); + tupleProjector = new TupleProjector(plan.getProjector()); + } + context.setCurrentTable(rhsTableRef); + context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes())); + ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[]{new ImmutableBytesPtr(emptyByteArray)}; + Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, strategy); + List<Expression> joinExpressions = joinConditions.getSecond(); + List<Expression> hashExpressions = joinConditions.getFirst(); + boolean needsMerge = lhsJoin.hasPostReference(); + PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null; + int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0; + PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable; + TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); + context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes())); + QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true); + Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable); + Integer limit = null; + Integer offset = null; + if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) { + limit = rhsPlan.getLimit(); + offset = rhsPlan.getOffset(); + } + HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[]{joinExpressions}, + new JoinType[]{type == JoinType.Right ? JoinType.Left : type}, new boolean[]{true}, + new PTable[]{lhsTable}, new int[]{fieldPosition}, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset)); + Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); + getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions); + return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[]{new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())}); } - HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes, - starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset)); - return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans); - } + case SORT_MERGE: { + JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); + JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1); + JoinType type = lastJoinSpec.getType(); + JoinTable rhsJoin = lastJoinSpec.getJoinTable(); + if (type == JoinType.Right) { + JoinTable temp = lhsJoin; + lhsJoin = rhsJoin; + rhsJoin = temp; + } - JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1); - JoinType type = lastJoinSpec.getType(); - if (!this.useSortMergeJoin - && (type == JoinType.Right || type == JoinType.Inner) - && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty() - && lastJoinSpec.getJoinTable().getTable().isFlat()) { - JoinTable rhsJoinTable = lastJoinSpec.getJoinTable(); - Table rhsTable = rhsJoinTable.getTable(); - JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); - Scan subScan = ScanUtil.newScan(originalScan); - StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); - QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null); - PTable rhsProjTable; - TableRef rhsTableRef; - SelectStatement rhs; - TupleProjector tupleProjector; - if (!rhsTable.isSubselect()) { - context.setCurrentTable(rhsTable.getTableRef()); - rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context); - rhsTableRef = rhsTable.getTableRef(); - rhsTable.projectColumns(context.getScan()); - rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery); - tupleProjector = new TupleProjector(rhsProjTable); - } else { - SelectStatement subquery = rhsTable.getAsSubquery(orderBy); - QueryPlan plan = compileSubquery(subquery, false); - rhsProjTable = rhsTable.createProjectedTable(plan.getProjector()); - rhsTableRef = plan.getTableRef(); - context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap()); - rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery); - tupleProjector = new TupleProjector(plan.getProjector()); - } - context.setCurrentTable(rhsTableRef); - context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes())); - ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)}; - Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT); - List<Expression> joinExpressions = joinConditions.getSecond(); - List<Expression> hashExpressions = joinConditions.getFirst(); - boolean needsMerge = lhsJoin.hasPostReference(); - PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null; - int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0; - PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable; - TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); - context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes())); - QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true); - Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable); - Integer limit = null; - Integer offset = null; - if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) { - limit = rhsPlan.getLimit(); - offset = rhsPlan.getOffset(); - } - HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] { joinExpressions }, - new JoinType[] { type == JoinType.Right ? JoinType.Left : type }, new boolean[] { true }, - new PTable[] { lhsTable }, new int[] { fieldPosition }, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset)); - Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); - getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions); - return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())}); - } + List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions(); + List<OrderByNode> lhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size()); + List<OrderByNode> rhsOrderBy = Lists.<OrderByNode>newArrayListWithExpectedSize(joinConditionNodes.size()); + for (EqualParseNode condition : joinConditionNodes) { + lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true)); + rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true)); + } - JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters(); - JoinTable rhsJoin = lastJoinSpec.getJoinTable(); - if (type == JoinType.Right) { - JoinTable temp = lhsJoin; - lhsJoin = rhsJoin; - rhsJoin = temp; - } - - List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions(); - List<OrderByNode> lhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size()); - List<OrderByNode> rhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size()); - for (EqualParseNode condition : joinConditionNodes) { - lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true)); - rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true)); - } - - Scan lhsScan = ScanUtil.newScan(originalScan); - StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement)); - boolean preserveRowkey = !projectPKColumns && type != JoinType.Full; - QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy); - PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable(); - boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty(); - - Scan rhsScan = ScanUtil.newScan(originalScan); - StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement)); - QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy); - PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable(); - - Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE); - List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst(); - List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond(); - - boolean needsMerge = rhsJoin.hasPostReference(); - int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0; - PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable; - - ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes()); - TableRef tableRef = resolver.getTables().get(0); - StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement)); - subCtx.setCurrentTable(tableRef); - QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly()); - context.setCurrentTable(tableRef); - context.setResolver(resolver); - TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString())); - ParseNode where = joinTable.getPostFiltersCombined(); - SelectStatement select = asSubquery - ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, - Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, null, 0, false, - joinTable.getStatement().hasSequence(), Collections.<SelectStatement> emptyList(), + Scan lhsScan = ScanUtil.newScan(originalScan); + StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement)); + boolean preserveRowkey = !projectPKColumns && type != JoinType.Full; + QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy); + PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable(); + boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty(); + + Scan rhsScan = ScanUtil.newScan(originalScan); + StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement)); + QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy); + PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable(); + + Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, strategy); + List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst(); + List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond(); + + boolean needsMerge = rhsJoin.hasPostReference(); + int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0; + PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable; + + ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), joinTable.getStatement().getUdfParseNodes()); + TableRef tableRef = resolver.getTables().get(0); + StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement)); + subCtx.setCurrentTable(tableRef); + QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly()); + context.setCurrentTable(tableRef); + context.setResolver(resolver); + TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString())); + ParseNode where = joinTable.getPostFiltersCombined(); + SelectStatement select = asSubquery + ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, + Collections.<AliasedNode>emptyList(), where, null, null, orderBy, null, null, 0, false, + joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(), joinTable.getStatement().getUdfParseNodes()) - : NODE_FACTORY.select(joinTable.getStatement(), from, where); - - return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder); + : NODE_FACTORY.select(joinTable.getStatement(), from, where); + + return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder); + } + default: + throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'"); + } } private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, SelectStatement select, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index ca88984..c2edaf3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.optimize.Cost; @@ -90,4 +91,5 @@ public interface QueryPlan extends StatementPlan { */ public boolean useRoundRobinIterator() throws SQLException; + <T> T accept(QueryPlanVisitor<T> visitor); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 2714858..02aadc5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -35,6 +35,7 @@ import org.apache.htrace.Sampler; import org.apache.htrace.TraceScope; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; @@ -270,6 +271,11 @@ public class TraceQueryPlan implements QueryPlan { } @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override public Long getEstimatedRowsToScan() { return 0l; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b40a36b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 2e042e7..0c8e8dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -33,6 +33,10 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; +import org.apache.phoenix.execute.visitor.AvgRowWidthVisitor; +import org.apache.phoenix.execute.visitor.ByteCountVisitor; +import org.apache.phoenix.execute.visitor.QueryPlanVisitor; +import org.apache.phoenix.execute.visitor.RowCountVisitor; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.expression.RowKeyExpression; @@ -117,25 +121,39 @@ public class AggregatePlan extends BaseQueryPlan { @Override public Cost getCost() { - Long byteCount = null; + Double outputBytes = this.accept(new ByteCountVisitor()); + Double rowWidth = this.accept(new AvgRowWidthVisitor()); + Long inputRows = null; try { - byteCount = getEstimatedBytesToScan(); + inputRows = getEstimatedRowsToScan(); } catch (SQLException e) { // ignored. } - - if (byteCount == null) { + if (inputRows == null || outputBytes == null || rowWidth == null) { return Cost.UNKNOWN; } + double inputBytes = inputRows * rowWidth; + double rowsBeforeHaving = RowCountVisitor.aggregate( + RowCountVisitor.filter( + inputRows.doubleValue(), + RowCountVisitor.stripSkipScanFilter( + context.getScan().getFilter())), + groupBy); + double rowsAfterHaving = RowCountVisitor.filter(rowsBeforeHaving, having); + double bytesBeforeHaving = rowWidth * rowsBeforeHaving; + double bytesAfterHaving = rowWidth * rowsAfterHaving; int parallelLevel = CostUtil.estimateParallelLevel( true, context.getConnection().getQueryServices()); - Cost cost = CostUtil.estimateAggregateCost(byteCount, - groupBy, aggregators.getEstimatedByteSize(), parallelLevel); + Cost cost = new Cost(0, 0, inputBytes); + Cost aggCost = CostUtil.estimateAggregateCost( + inputBytes, bytesBeforeHaving, groupBy, parallelLevel); + cost = cost.plus(aggCost); if (!orderBy.getOrderByExpressions().isEmpty()) { - double outputBytes = CostUtil.estimateAggregateOutputBytes( - byteCount, groupBy, aggregators.getEstimatedByteSize()); - Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel); + parallelLevel = CostUtil.estimateParallelLevel( + false, context.getConnection().getQueryServices()); + Cost orderByCost = CostUtil.estimateOrderByCost( + bytesAfterHaving, outputBytes, parallelLevel); cost = cost.plus(orderByCost); } return cost; @@ -304,4 +322,9 @@ public class AggregatePlan extends BaseQueryPlan { return false; } + @Override + public <T> T accept(QueryPlanVisitor<T> visitor) { + return visitor.visit(this); + } + }