Repository: phoenix Updated Branches: refs/heads/master 88038a2da -> 6807dacce
PHOENIX-4288 Indexes not used when ordering by primary key Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6807dacc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6807dacc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6807dacc Branch: refs/heads/master Commit: 6807dacce7d063e14f06bc57888e7d2a5f78863a Parents: 88038a2 Author: maryannxue <maryann....@gmail.com> Authored: Tue Dec 5 10:52:46 2017 -0800 Committer: maryannxue <maryann....@gmail.com> Committed: Tue Dec 5 10:52:46 2017 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/CostBasedDecisionIT.java | 466 +++++++++++++++++++ .../apache/phoenix/end2end/MutationStateIT.java | 17 + .../apache/phoenix/end2end/SystemCatalogIT.java | 17 + .../phoenix/compile/ListJarsQueryPlan.java | 6 + .../org/apache/phoenix/compile/QueryPlan.java | 5 +- .../apache/phoenix/compile/TraceQueryPlan.java | 6 + .../apache/phoenix/execute/AggregatePlan.java | 30 +- .../apache/phoenix/execute/BaseQueryPlan.java | 21 +- .../phoenix/execute/ClientAggregatePlan.java | 28 ++ .../apache/phoenix/execute/ClientScanPlan.java | 25 + .../apache/phoenix/execute/CorrelatePlan.java | 25 + .../phoenix/execute/DelegateQueryPlan.java | 6 + .../apache/phoenix/execute/HashJoinPlan.java | 29 ++ .../execute/LiteralResultIterationPlan.java | 6 + .../org/apache/phoenix/execute/ScanPlan.java | 25 + .../phoenix/execute/SortMergeJoinPlan.java | 18 + .../org/apache/phoenix/execute/UnionPlan.java | 10 + .../apache/phoenix/jdbc/PhoenixStatement.java | 6 + .../java/org/apache/phoenix/optimize/Cost.java | 123 +++++ .../apache/phoenix/optimize/QueryOptimizer.java | 30 +- .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 4 + .../java/org/apache/phoenix/util/CostUtil.java | 90 ++++ .../query/ParallelIteratorsSplitTest.java | 6 + 24 files changed, 988 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java new file mode 100644 index 0000000..a3584ce --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT { + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); + props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); + props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testCostOverridesStaticPlanOrdering1() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + String tableName = BaseTest.generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "rowkey VARCHAR PRIMARY KEY,\n" + + "c1 VARCHAR,\n" + + "c2 VARCHAR)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)"); + + String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey"; + // Use the data table plan that opts out order-by when stats are not available. + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".", + plan.contains("FULL SCAN")); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); + for (int i = 0; i < 10000; i++) { + int c1 = i % 16; + stmt.setString(1, "k" + i); + stmt.setString(2, "X" + Integer.toHexString(c1) + c1); + stmt.setString(3, "c"); + stmt.execute(); + } + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + // Use the index table plan that has a lower cost when stats become available. + rs = conn.createStatement().executeQuery("explain " + query); + plan = QueryUtil.getExplainPlan(rs); + assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".", + plan.contains("RANGE SCAN")); + } finally { + conn.close(); + } + } + + @Test + public void testCostOverridesStaticPlanOrdering2() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + String tableName = BaseTest.generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "rowkey VARCHAR PRIMARY KEY,\n" + + "c1 VARCHAR,\n" + + "c2 VARCHAR)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)"); + + String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey"; + // Use the index table plan that opts out order-by when stats are not available. + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".", + plan.contains("RANGE SCAN")); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); + for (int i = 0; i < 10000; i++) { + int c1 = i % 16; + stmt.setString(1, "k" + i); + stmt.setString(2, "X" + Integer.toHexString(c1) + c1); + stmt.setString(3, "c"); + stmt.execute(); + } + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + // Given that the range on C1 is meaningless and group-by becomes + // order-preserving if using the data table, the data table plan should + // come out as the best plan based on the costs. + rs = conn.createStatement().executeQuery("explain " + query); + plan = QueryUtil.getExplainPlan(rs); + assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".", + plan.contains("FULL SCAN")); + } finally { + conn.close(); + } + } + + @Test + public void testCostOverridesStaticPlanOrdering3() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + String tableName = BaseTest.generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "rowkey VARCHAR PRIMARY KEY,\n" + + "c1 INTEGER,\n" + + "c2 INTEGER,\n" + + "c3 INTEGER)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)"); + + String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000"; + // Use the idx2 plan with a wider PK slot span when stats are not available. + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + String indexPlan = + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" + + " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" + + "CLIENT MERGE SORT"; + assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", + plan.contains(indexPlan)); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)"); + for (int i = 0; i < 10000; i++) { + stmt.setString(1, "k" + i); + stmt.setInt(2, i); + stmt.setInt(3, i); + stmt.setInt(4, i); + stmt.execute(); + } + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + // Use the idx2 plan that scans less data when stats become available. + rs = conn.createStatement().executeQuery("explain " + query); + plan = QueryUtil.getExplainPlan(rs); + String dataPlan = + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" + + " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" + + "CLIENT MERGE SORT"; + assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".", + plan.contains(dataPlan)); + } finally { + conn.close(); + } + } + + @Test + public void testCostOverridesStaticPlanOrderingInUpsertQuery() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + String tableName = BaseTest.generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "rowkey VARCHAR PRIMARY KEY,\n" + + "c1 INTEGER,\n" + + "c2 INTEGER,\n" + + "c3 INTEGER)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)"); + + String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000"; + // Use the idx2 plan with a wider PK slot span when stats are not available. + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + String indexPlan = + "UPSERT SELECT\n" + + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" + + " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" + + "CLIENT MERGE SORT"; + assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", + plan.contains(indexPlan)); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)"); + for (int i = 0; i < 10000; i++) { + stmt.setString(1, "k" + i); + stmt.setInt(2, i); + stmt.setInt(3, i); + stmt.setInt(4, i); + stmt.execute(); + } + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + // Use the idx2 plan that scans less data when stats become available. + rs = conn.createStatement().executeQuery("explain " + query); + plan = QueryUtil.getExplainPlan(rs); + String dataPlan = + "UPSERT SELECT\n" + + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" + + " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" + + "CLIENT MERGE SORT"; + assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".", + plan.contains(dataPlan)); + } finally { + conn.close(); + } + } + + @Test + public void testCostOverridesStaticPlanOrderingInDeleteQuery() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + String tableName = BaseTest.generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "rowkey VARCHAR PRIMARY KEY,\n" + + "c1 INTEGER,\n" + + "c2 INTEGER,\n" + + "c3 INTEGER)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)"); + + String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000"; + // Use the idx2 plan with a wider PK slot span when stats are not available. + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + String indexPlan = + "DELETE ROWS\n" + + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" + + " SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" + + "CLIENT MERGE SORT"; + assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", + plan.contains(indexPlan)); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)"); + for (int i = 0; i < 10000; i++) { + stmt.setString(1, "k" + i); + stmt.setInt(2, i); + stmt.setInt(3, i); + stmt.setInt(4, i); + stmt.execute(); + } + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + // Use the idx2 plan that scans less data when stats become available. + rs = conn.createStatement().executeQuery("explain " + query); + plan = QueryUtil.getExplainPlan(rs); + String dataPlan = + "DELETE ROWS\n" + + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" + + " SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" + + "CLIENT MERGE SORT"; + assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".", + plan.contains(dataPlan)); + } finally { + conn.close(); + } + } + + @Test + public void testCostOverridesStaticPlanOrderingInUnionQuery() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + String tableName = BaseTest.generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "rowkey VARCHAR PRIMARY KEY,\n" + + "c1 VARCHAR,\n" + + "c2 VARCHAR)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)"); + + String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 " + + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey"; + // Use the default plan when stats are not available. + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + String defaultPlan = + "UNION ALL OVER 2 QUERIES\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" + + " CLIENT MERGE SORT\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" + + " CLIENT MERGE SORT"; + assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".", + plan.contains(defaultPlan)); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); + for (int i = 0; i < 10000; i++) { + int c1 = i % 16; + stmt.setString(1, "k" + i); + stmt.setString(2, "X" + Integer.toHexString(c1) + c1); + stmt.setString(3, "c"); + stmt.execute(); + } + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + // Use the optimal plan based on cost when stats become available. + rs = conn.createStatement().executeQuery("explain " + query); + plan = QueryUtil.getExplainPlan(rs); + String optimizedPlan = + "UNION ALL OVER 2 QUERIES\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" + + " SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" + + " CLIENT MERGE SORT\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" + + " SERVER FILTER BY C1 LIKE 'X%'\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]"; + assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".", + plan.contains(optimizedPlan)); + } finally { + conn.close(); + } + } + + @Test + public void testCostOverridesStaticPlanOrderingInJoinQuery() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + String tableName = BaseTest.generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "rowkey VARCHAR PRIMARY KEY,\n" + + "c1 VARCHAR,\n" + + "c2 VARCHAR)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)"); + + String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 " + + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 " + + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey"; + // Use the default plan when stats are not available. + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + String defaultPlan = + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" + + " SERVER FILTER BY C1 LIKE 'X0%'\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" + + " CLIENT MERGE SORT\n" + + " DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)"; + assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".", + plan.contains(defaultPlan)); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); + for (int i = 0; i < 10000; i++) { + int c1 = i % 16; + stmt.setString(1, "k" + i); + stmt.setString(2, "X" + Integer.toHexString(c1) + c1); + stmt.setString(3, "c"); + stmt.execute(); + } + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + // Use the optimal plan based on cost when stats become available. + rs = conn.createStatement().executeQuery("explain " + query); + plan = QueryUtil.getExplainPlan(rs); + String optimizedPlan = + "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER SORTED BY [\"T1.:ROWKEY\"]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" + + " SERVER FILTER BY C1 LIKE 'X%'\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" + + " DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)"; + assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".", + plan.contains(optimizedPlan)); + } finally { + conn.close(); + } + } + + @Test + public void testHintOverridesCost() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + String tableName = BaseTest.generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" + + "rowkey INTEGER PRIMARY KEY,\n" + + "c1 VARCHAR,\n" + + "c2 VARCHAR)"); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)"); + + String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where rowkey between 1 and 10 ORDER BY c1"; + String hintedQuery = query.replaceFirst("SELECT", + "SELECT /*+ INDEX(" + tableName + " " + tableName + "_idx) */"); + String dataPlan = "SERVER SORTED BY [C1]"; + String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)"; + + // Use the index table plan that opts out order-by when stats are not available. + ResultSet rs = conn.createStatement().executeQuery("explain " + query); + String plan = QueryUtil.getExplainPlan(rs); + assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", + plan.contains(indexPlan)); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)"); + for (int i = 0; i < 10000; i++) { + int c1 = i % 16; + stmt.setInt(1, i); + stmt.setString(2, "X" + Integer.toHexString(c1) + c1); + stmt.setString(3, "c"); + stmt.execute(); + } + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + // Use the data table plan that has a lower cost when stats are available. + rs = conn.createStatement().executeQuery("explain " + query); + plan = QueryUtil.getExplainPlan(rs); + assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".", + plan.contains(dataPlan)); + + // Use the index table plan as has been hinted. + rs = conn.createStatement().executeQuery("explain " + hintedQuery); + plan = QueryUtil.getExplainPlan(rs); + assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".", + plan.contains(indexPlan)); + } finally { + conn.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index 2d5f360..36782c1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.phoenix.end2end; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java index 8a9bca2..7b6a543 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.phoenix.end2end; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index 839e7c9..0688b94 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -49,6 +49,7 @@ import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.LiteralParseNode; import org.apache.phoenix.parse.ParseNodeFactory; @@ -186,6 +187,11 @@ public class ListJarsQueryPlan implements QueryPlan { } @Override + public Cost getCost() { + return Cost.ZERO; + } + + @Override public TableRef getTableRef() { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index f7cdcbf..ca88984 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -26,6 +26,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; @@ -52,7 +53,9 @@ public interface QueryPlan extends StatementPlan { public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException; public long getEstimatedSize(); - + + public Cost getCost(); + // TODO: change once joins are supported TableRef getTableRef(); /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 62e6991..2714858 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -46,6 +46,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.LiteralParseNode; import org.apache.phoenix.parse.ParseNodeFactory; @@ -194,6 +195,11 @@ public class TraceQueryPlan implements QueryPlan { } @Override + public Cost getCost() { + return Cost.ZERO; + } + + @Override public Set<TableRef> getSourceRefs() { return Collections.emptySet(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 4c29abe..369769e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -59,6 +59,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.SerialIterators; import org.apache.phoenix.iterate.SpoolingResultIterator; import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; @@ -67,6 +68,7 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.CostUtil; import org.apache.phoenix.util.ScanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +114,33 @@ public class AggregatePlan extends BaseQueryPlan { public Expression getHaving() { return having; } - + + @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + int parallelLevel = CostUtil.estimateParallelLevel( + true, context.getConnection().getQueryServices()); + Cost cost = CostUtil.estimateAggregateCost(byteCount, + groupBy, aggregators.getEstimatedByteSize(), parallelLevel); + if (!orderBy.getOrderByExpressions().isEmpty()) { + double outputBytes = CostUtil.estimateAggregateOutputBytes( + byteCount, groupBy, aggregators.getEstimatedByteSize()); + Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel); + cost = cost.plus(orderByCost); + } + return cost; + } + @Override public List<KeyRange> getSplits() { if (splits == null) http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index c1ddd44..31f67b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -63,6 +63,8 @@ import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; @@ -500,13 +502,24 @@ public abstract class BaseQueryPlan implements QueryPlan { if (context.getScanRanges() == ScanRanges.NOTHING) { return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + getTableRef().getTable().getName().getString())); } - + + // If cost-based optimizer is enabled, we need to initialize a dummy iterator to + // get the stats for computing costs. + boolean costBased = + context.getConnection().getQueryServices().getConfiguration().getBoolean( + QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED); + if (costBased) { + ResultIterator iterator = iterator(); + iterator.close(); + } // Optimize here when getting explain plan, as queries don't get optimized until after compilation QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this); ExplainPlan exp = plan instanceof BaseQueryPlan ? new ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan(); - this.estimatedRows = plan.getEstimatedRowsToScan(); - this.estimatedSize = plan.getEstimatedBytesToScan(); - this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp(); + if (!costBased) { // do not override estimates if they are used for cost calculation. + this.estimatedRows = plan.getEstimatedRowsToScan(); + this.estimatedSize = plan.getEstimatedBytesToScan(); + this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp(); + } return exp; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java index 8ef1f8d..a15ab35 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java @@ -56,12 +56,14 @@ import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.CostUtil; import org.apache.phoenix.util.TupleUtil; import com.google.common.collect.Lists; @@ -87,6 +89,32 @@ public class ClientAggregatePlan extends ClientProcessingPlan { } @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + int parallelLevel = CostUtil.estimateParallelLevel( + false, context.getConnection().getQueryServices()); + Cost cost = CostUtil.estimateAggregateCost(byteCount, + groupBy, clientAggregators.getEstimatedByteSize(), parallelLevel); + if (!orderBy.getOrderByExpressions().isEmpty()) { + double outputBytes = CostUtil.estimateAggregateOutputBytes( + byteCount, groupBy, clientAggregators.getEstimatedByteSize()); + Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, parallelLevel); + cost = cost.plus(orderByCost); + } + return super.getCost().plus(cost); + } + + @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { ResultIterator iterator = delegate.iterator(scanGrouper, scan); if (where != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java index 6bbc545..5799990 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java @@ -34,10 +34,12 @@ import org.apache.phoenix.iterate.OrderedResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.CostUtil; import com.google.common.collect.Lists; @@ -50,6 +52,29 @@ public class ClientScanPlan extends ClientProcessingPlan { } @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + Cost cost = new Cost(0, 0, byteCount); + int parallelLevel = CostUtil.estimateParallelLevel( + false, context.getConnection().getQueryServices()); + if (!orderBy.getOrderByExpressions().isEmpty()) { + Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel); + cost = cost.plus(orderByCost); + } + return super.getCost().plus(cost); + } + + @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { ResultIterator iterator = delegate.iterator(scanGrouper, scan); if (where != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java index ee81c36..270ad3d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java @@ -30,6 +30,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; @@ -200,4 +201,28 @@ public class CorrelatePlan extends DelegateQueryPlan { return null; } + @Override + public Cost getCost() { + Long lhsByteCount = null; + try { + lhsByteCount = delegate.getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + Long rhsRowCount = null; + try { + rhsRowCount = rhs.getEstimatedRowsToScan(); + } catch (SQLException e) { + // ignored. + } + + if (lhsByteCount == null || rhsRowCount == null) { + return Cost.UNKNOWN; + } + + Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount); + Cost lhsCost = delegate.getCost(); + return cost.plus(lhsCost).plus(rhs.getCost()); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java index 3c62c5b..3da06db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java @@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; @@ -59,6 +60,11 @@ public abstract class DelegateQueryPlan implements QueryPlan { } @Override + public Cost getCost() { + return delegate.getCost(); + } + + @Override public TableRef getTableRef() { return delegate.getTableRef(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 2b90dcb..2d2ff4e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -62,6 +62,7 @@ import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; @@ -290,6 +291,34 @@ public class HashJoinPlan extends DelegateQueryPlan { return statement; } + @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + Cost cost = new Cost(0, 0, byteCount); + Cost lhsCost = delegate.getCost(); + if (keyRangeExpressions != null) { + // The selectivity of the dynamic rowkey filter. + // TODO replace the constant with an estimate value. + double selectivity = 0.01; + lhsCost = lhsCost.multiplyBy(selectivity); + } + Cost rhsCost = Cost.ZERO; + for (SubPlan subPlan : subPlans) { + rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost()); + } + return cost.plus(lhsCost).plus(rhsCost); + } + protected interface SubPlan { public ServerCache execute(HashJoinPlan parent) throws SQLException; public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index 86f59c5..1d1332d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -35,6 +35,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; @@ -60,6 +61,11 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { } @Override + public Cost getCost() { + return Cost.ZERO; + } + + @Override public List<KeyRange> getSplits() { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 1e1cb0d..31d7097 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -53,6 +53,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.SerialIterators; import org.apache.phoenix.iterate.SpoolingResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.ConnectionQueryServices; @@ -64,6 +65,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.CostUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ScanUtil; @@ -189,6 +191,29 @@ public class ScanPlan extends BaseQueryPlan { } @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + Cost cost = new Cost(0, 0, byteCount); + int parallelLevel = CostUtil.estimateParallelLevel( + true, context.getConnection().getQueryServices()); + if (!orderBy.getOrderByExpressions().isEmpty()) { + Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, parallelLevel); + cost = cost.plus(orderByCost); + } + return cost; + } + + @Override public List<KeyRange> getSplits() { if (splits == null) return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index fab7c59..3e380da 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.query.KeyRange; @@ -192,6 +193,23 @@ public class SortMergeJoinPlan implements QueryPlan { } @Override + public Cost getCost() { + Long byteCount = null; + try { + byteCount = getEstimatedBytesToScan(); + } catch (SQLException e) { + // ignored. + } + + if (byteCount == null) { + return Cost.UNKNOWN; + } + + Cost cost = new Cost(0, 0, byteCount); + return cost.plus(lhsPlan.getCost()).plus(rhsPlan.getCost()); + } + + @Override public StatementContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java index e06522f..e6bf654 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java @@ -43,6 +43,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.UnionResultIterators; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; @@ -210,6 +211,15 @@ public class UnionPlan implements QueryPlan { } @Override + public Cost getCost() { + Cost cost = Cost.ZERO; + for (QueryPlan plan : plans) { + cost = cost.plus(plan.getCost()); + } + return cost; + } + + @Override public ParameterMetaData getParameterMetaData() { return paramMetaData; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 384c8cc..26b4415 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -91,6 +91,7 @@ import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.iterate.MaterializedResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.AddColumnStatement; import org.apache.phoenix.parse.AddJarsStatement; import org.apache.phoenix.parse.AliasedNode; @@ -647,6 +648,11 @@ public class PhoenixStatement implements Statement, SQLCloseable { } @Override + public Cost getCost() { + return Cost.ZERO; + } + + @Override public TableRef getTableRef() { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java new file mode 100644 index 0000000..b83f354 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.optimize; + +import java.util.Objects; + +/** + * Optimizer cost in terms of CPU, memory, and I/O usage, the unit of which is now the + * number of bytes processed. + * + */ +public class Cost implements Comparable<Cost> { + /** The unknown cost. */ + public static Cost UNKNOWN = new Cost(Double.NaN, Double.NaN, Double.NaN) { + @Override + public String toString() { + return "{unknown}"; + } + }; + + /** The zero cost. */ + public static Cost ZERO = new Cost(0, 0, 0) { + @Override + public String toString() { + return "{zero}"; + } + }; + + private final double cpu; + private final double memory; + private final double io; + + public Cost(double cpu, double memory, double io) { + this.cpu = cpu; + this.memory = memory; + this.io = io; + } + + public double getCpu() { + return cpu; + } + + public double getMemory() { + return memory; + } + + public double getIo() { + return io; + } + + public boolean isUnknown() { + return this == UNKNOWN; + } + + public Cost plus(Cost other) { + if (isUnknown() || other.isUnknown()) { + return UNKNOWN; + } + + return new Cost( + this.cpu + other.cpu, + this.memory + other.memory, + this.io + other.io); + } + + public Cost multiplyBy(double factor) { + if (isUnknown()) { + return UNKNOWN; + } + + return new Cost( + this.cpu * factor, + this.memory * factor, + this.io * factor); + } + + // TODO right now for simplicity, we choose to ignore CPU and memory costs. We may + // add those into account as our cost model mature. + @Override + public int compareTo(Cost other) { + if (isUnknown() && other.isUnknown()) { + return 0; + } else if (isUnknown() && !other.isUnknown()) { + return 1; + } else if (!isUnknown() && other.isUnknown()) { + return -1; + } + + double d = this.io - other.io; + return d == 0 ? 0 : (d > 0 ? 1 : -1); + } + + @Override + public boolean equals(Object obj) { + return this == obj + || (obj instanceof Cost && this.compareTo((Cost) obj) == 0); + } + + @Override + public int hashCode() { + return Objects.hash(cpu, memory, io); + } + + @Override + public String toString() { + return "{cpu: " + cpu + ", memory: " + memory + ", io: " + io + "}"; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index b3df50b..64dad58 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -67,10 +67,12 @@ public class QueryOptimizer { private final QueryServices services; private final boolean useIndexes; + private final boolean costBased; public QueryOptimizer(QueryServices services) { this.services = services; this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES); + this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED); } public QueryPlan optimize(PhoenixStatement statement, QueryPlan dataPlan) throws SQLException { @@ -91,7 +93,7 @@ public class QueryOptimizer { } public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException { - List<QueryPlan>plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true); + List<QueryPlan> plans = getApplicablePlans(dataPlan, statement, targetColumns, parallelIteratorFactory, true); return plans.get(0); } @@ -309,10 +311,11 @@ public class QueryOptimizer { } return null; } - + /** * Order the plans among all the possible ones from best to worst. - * Since we don't keep stats yet, we use the following simple algorithm: + * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on + * their costs, otherwise we use the following simple algorithm: * 1) If the query is a point lookup (i.e. we have a set of exact row keys), choose that one immediately. * 2) If the query has an ORDER BY and a LIMIT, choose the plan that has all the ORDER BY expression * in the same order as the row key columns. @@ -320,9 +323,6 @@ public class QueryOptimizer { * a) the most row key columns that may be used to form the start/stop scan key (i.e. bound slots). * b) the plan that preserves ordering for a group by. * c) the non local index table plan - * TODO: We should make more of a cost based choice: The largest number of bound slots does not necessarily - * correspond to the least bytes scanned. We could consider the slots bound for upper and lower ranges - * separately, or we could calculate the bytes scanned between the start and stop row of each table. * @param plans the list of candidate plans * @return list of plans ordered from best to worst. */ @@ -331,7 +331,21 @@ public class QueryOptimizer { if (plans.size() == 1) { return plans; } - + + if (this.costBased) { + Collections.sort(plans, new Comparator<QueryPlan>() { + @Override + public int compare(QueryPlan plan1, QueryPlan plan2) { + return plan1.getCost().compareTo(plan2.getCost()); + } + }); + // Return ordered list based on cost if stats are available; otherwise fall + // back to static ordering. + if (!plans.get(0).getCost().isUnknown()) { + return stopAtBestPlan ? plans.subList(0, 1) : plans; + } + } + /** * If we have a plan(s) that are just point lookups (i.e. fully qualified row * keys), then favor those first. @@ -428,7 +442,7 @@ public class QueryOptimizer { } }); - + return stopAtBestPlan ? bestCandidates.subList(0, 1) : bestCandidates; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 59f7385..0b80f4d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -294,6 +294,9 @@ public interface QueryServices extends SQLCloseable { //Update Cache Frequency default config attribute public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB = "phoenix.default.update.cache.frequency"; + // Whether to enable cost-based-decision in the query optimizer + public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled"; + /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 3ceb084..4d31974 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -25,6 +25,7 @@ import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_ import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; +import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB; import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK; @@ -341,6 +342,8 @@ public class QueryServicesOptions { // RS -> RS calls for upsert select statements are disabled by default public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false; + public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false; + private final Configuration config; private QueryServicesOptions(Configuration config) { @@ -418,6 +421,7 @@ public class QueryServicesOptions { .setIfUnset(TRACING_THREAD_POOL_SIZE, DEFAULT_TRACING_THREAD_POOL_SIZE) .setIfUnset(STATS_COLLECTION_ENABLED, DEFAULT_STATS_COLLECTION_ENABLED) .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION) + .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED) .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING) .setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED); // HBase sets this to 1, so we reset it to something more appropriate. http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java new file mode 100644 index 0000000..1d4b8e0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.optimize.Cost; +import org.apache.phoenix.query.QueryServices; + +/** + * Utilities for computing costs. + * + * Some of the methods here should eventually be replaced by a metadata framework which + * estimates output metrics for each QueryPlan or operation, e.g. row count, byte count, + * etc. + */ +public class CostUtil { + + // An estimate of the ratio of result data from group-by against the input data. + private final static double GROUPING_FACTOR = 0.1; + + // Io operations conducted in intermediate evaluations like sorting or aggregation + // should be counted twice since they usually involve both read and write. + private final static double IO_COST_MULTIPLIER = 2.0; + + /** + * Estimate the number of output bytes of an aggregate. + * @param byteCount the number of input bytes + * @param groupBy the compiled GroupBy object + * @param aggregatorsSize the byte size of aggregators + * @return the output byte count + */ + public static double estimateAggregateOutputBytes( + double byteCount, GroupBy groupBy, int aggregatorsSize) { + if (groupBy.isUngroupedAggregate()) { + return aggregatorsSize; + } + return byteCount * GROUPING_FACTOR; + } + + /** + * Estimate the cost of an aggregate. + * @param byteCount the number of input bytes + * @param groupBy the compiled GroupBy object + * @param aggregatorsSize the byte size of aggregators + * @param parallelLevel number of parallel workers or threads + * @return the cost + */ + public static Cost estimateAggregateCost( + double byteCount, GroupBy groupBy, int aggregatorsSize, int parallelLevel) { + double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, aggregatorsSize); + double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0; + return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER / parallelLevel); + } + + /** + * Estimate the cost of an order-by + * @param byteCount the number of input bytes + * @param parallelLevel number of parallel workers or threads + * @return the cost + */ + public static Cost estimateOrderByCost(double byteCount, int parallelLevel) { + return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel); + } + + /** + * Estimate the parallel level of an operation + * @param runningOnServer if the operation will be running on server side + * @param services the QueryServices object + * @return the parallel level + */ + public static int estimateParallelLevel(boolean runningOnServer, QueryServices services) { + // TODO currently return constants for simplicity, should derive from cluster config. + return runningOnServer ? 10 : 1; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6807dacc/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index 935d8cb..0f12d9c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -52,6 +52,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; @@ -486,6 +487,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { public Long getEstimateInfoTimestamp() throws SQLException { return null; } + + @Override + public Cost getCost() { + return Cost.ZERO; + } }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null); List<KeyRange> keyRanges = parallelIterators.getSplits();