PHOENIX-4010 Hash Join cache may not be send to all regionservers when we have stale HBase meta cache
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6259055c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6259055c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6259055c Branch: refs/heads/4.x-HBase-0.98 Commit: 6259055ccfabded7f36a2e06106f07e0ed0a3ef6 Parents: f884b76 Author: Ankit Singhal <[email protected]> Authored: Sat Sep 9 22:32:37 2017 +0530 Committer: Ankit Singhal <[email protected]> Committed: Sat Sep 9 22:32:37 2017 +0530 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/BaseJoinIT.java | 14 + .../apache/phoenix/end2end/HashJoinCacheIT.java | 453 +++++++++++++++++++ .../org/apache/phoenix/end2end/HashJoinIT.java | 122 +++-- .../end2end/index/PartialIndexRebuilderIT.java | 23 +- .../DelayedTableResultIteratorFactory.java | 11 +- .../apache/phoenix/cache/ServerCacheClient.java | 338 +++++++++----- .../coprocessor/BaseScannerRegionObserver.java | 2 - .../HashJoinCacheNotFoundException.java | 45 ++ .../coprocessor/HashJoinRegionScanner.java | 10 +- .../phoenix/exception/SQLExceptionCode.java | 3 +- .../apache/phoenix/execute/AggregatePlan.java | 10 +- .../apache/phoenix/execute/BaseQueryPlan.java | 27 +- .../apache/phoenix/execute/HashJoinPlan.java | 16 +- .../execute/LiteralResultIterationPlan.java | 9 +- .../org/apache/phoenix/execute/ScanPlan.java | 11 +- .../phoenix/iterate/BaseResultIterators.java | 54 ++- .../DefaultTableResultIteratorFactory.java | 7 +- .../phoenix/iterate/ParallelIterators.java | 13 +- .../apache/phoenix/iterate/SerialIterators.java | 15 +- .../phoenix/iterate/TableResultIterator.java | 53 ++- .../iterate/TableResultIteratorFactory.java | 5 +- .../apache/phoenix/join/HashCacheClient.java | 20 +- .../apache/phoenix/query/QueryConstants.java | 2 + .../java/org/apache/phoenix/util/ByteUtil.java | 15 + .../org/apache/phoenix/util/ServerUtil.java | 7 + .../java/org/apache/phoenix/query/BaseTest.java | 1 + .../query/ParallelIteratorsSplitTest.java | 2 +- .../java/org/apache/phoenix/util/TestUtil.java | 29 ++ 28 files changed, 1030 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java index 152bdf0..a823a72 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.end2end; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -28,16 +29,23 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Map; +import java.util.Properties; import java.util.regex.Pattern; +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; import org.junit.Before; +import org.junit.BeforeClass; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; public abstract class BaseJoinIT extends ParallelStatsDisabledIT { + protected static final String JOIN_SCHEMA = "Join"; protected static final String JOIN_ORDER_TABLE = "OrderTable"; protected static final String JOIN_CUSTOMER_TABLE = "CustomerTable"; @@ -442,6 +450,12 @@ public abstract class BaseJoinIT extends ParallelStatsDisabledIT { conn.commit(); } + protected Connection getConnection() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true"); + return DriverManager.getConnection(getUrl(), props); + } + protected void createIndexes(Connection conn, String virtualName, String realName) throws Exception { if (indexDDL != null && indexDDL.length > 0) { for (String ddl : indexDDL) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java new file mode 100644 index 0000000..3f60d9b --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import java.io.IOException; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; + +@RunWith(Parameterized.class) +public class HashJoinCacheIT extends HashJoinIT { + + public HashJoinCacheIT(String[] indexDDL, String[] plans) throws Exception { + super(indexDDL, plans); + } + + @Override + protected String getTableName(Connection conn, String virtualName) throws Exception { + String realName = super.getTableName(conn, virtualName); + TestUtil.addCoprocessor(conn, SchemaUtil.normalizeFullTableName(realName), InvalidateHashCache.class); + return realName; + } + + @Parameters + public static Collection<Object> data() { + List<Object> testCases = Lists.newArrayList(); + testCases.add(new String[][] { + {}, { + /* + * testLeftJoinWithAggregation() + * SELECT i.name, sum(quantity) FROM joinOrderTable o + * LEFT JOIN joinItemTable i ON o.item_id = i.item_id + * GROUP BY i.name ORDER BY i.name + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME, + /* + * testLeftJoinWithAggregation() + * SELECT i.item_id iid, sum(quantity) q FROM joinOrderTable o + * LEFT JOIN joinItemTable i ON o.item_id = i.item_id + * GROUP BY i.item_id ORDER BY q DESC" + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"I.item_id\"]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT SORTED BY [SUM(O.QUANTITY) DESC]\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY FIRST KEY ONLY", + /* + * testLeftJoinWithAggregation() + * SELECT i.item_id iid, sum(quantity) q FROM joinItemTable i + * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id + * GROUP BY i.item_id ORDER BY q DESC NULLS LAST, iid + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"I.item_id\"]\n" + + "CLIENT SORTED BY [SUM(O.QUANTITY) DESC NULLS LAST, \"I.item_id\"]\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME, + /* + * testRightJoinWithAggregation() + * SELECT i.name, sum(quantity) FROM joinOrderTable o + * RIGHT JOIN joinItemTable i ON o.item_id = i.item_id + * GROUP BY i.name ORDER BY i.name + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME, + /* + * testRightJoinWithAggregation() + * SELECT i.item_id iid, sum(quantity) q FROM joinOrderTable o + * RIGHT JOIN joinItemTable i ON o.item_id = i.item_id + * GROUP BY i.item_id ORDER BY q DESC NULLS LAST, iid + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"I.item_id\"]\n" + + "CLIENT SORTED BY [SUM(O.QUANTITY) DESC NULLS LAST, \"I.item_id\"]\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME, + /* + * testJoinWithWildcard() + * SELECT * FROM joinItemTable LEFT JOIN joinSupplierTable supp + * ON joinItemTable.supplier_id = supp.supplier_id + * ORDER BY item_id + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME, + /* + * testJoinPlanWithIndex() + * SELECT item.item_id, item.name, supp.supplier_id, supp.name + * FROM joinItemTable item LEFT JOIN joinSupplierTable supp + * ON substr(item.name, 2, 1) = substr(supp.name, 2, 1) + * AND (supp.name BETWEEN 'S1' AND 'S5') + * WHERE item.name BETWEEN 'T1' AND 'T5' + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY (NAME >= 'T1' AND NAME <= 'T5')\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY (NAME >= 'S1' AND NAME <= 'S5')", + /* + * testJoinPlanWithIndex() + * SELECT item.item_id, item.name, supp.supplier_id, supp.name + * FROM joinItemTable item INNER JOIN joinSupplierTable supp + * ON item.supplier_id = supp.supplier_id + * WHERE (item.name = 'T1' OR item.name = 'T5') + * AND (supp.name = 'S1' OR supp.name = 'S5') + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY (NAME = 'T1' OR NAME = 'T5')\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY (NAME = 'S1' OR NAME = 'S5')", + /* + * testJoinWithSkipMergeOptimization() + * SELECT s.name FROM joinItemTable i + * JOIN joinOrderTable o ON o.item_id = i.item_id AND quantity < 5000 + * JOIN joinSupplierTable s ON i.supplier_id = s.supplier_id + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " PARALLEL INNER-JOIN TABLE 0 (SKIP MERGE)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY QUANTITY < 5000\n" + + " PARALLEL INNER-JOIN TABLE 1\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")", + /* + * testSelfJoin() + * SELECT i2.item_id, i1.name FROM joinItemTable i1 + * JOIN joinItemTable i2 ON i1.item_id = i2.item_id + * ORDER BY i1.item_id + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " DYNAMIC SERVER FILTER BY \"I1.item_id\" IN (\"I2.item_id\")", + /* + * testSelfJoin() + * SELECT i1.name, i2.name FROM joinItemTable i1 + * JOIN joinItemTable i2 ON i1.item_id = i2.supplier_id + * ORDER BY i1.name, i2.name + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER SORTED BY [I1.NAME, I2.NAME]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"I1.item_id\" IN (\"I2.supplier_id\")", + /* + * testStarJoin() + * SELECT order_id, c.name, i.name iname, quantity, o.date + * FROM joinOrderTable o + * JOIN joinCustomerTable c ON o.customer_id = c.customer_id + * JOIN joinItemTable i ON o.item_id = i.item_id + * ORDER BY order_id + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + "\n" + + " PARALLEL INNER-JOIN TABLE 1\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME, + /* + * testStarJoin() + * SELECT (*NO_STAR_JOIN*) order_id, c.name, i.name iname, quantity, o.date + * FROM joinOrderTable o + * JOIN joinCustomerTable c ON o.customer_id = c.customer_id + * JOIN joinItemTable i ON o.item_id = i.item_id + * ORDER BY order_id + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER SORTED BY [\"O.order_id\"]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")", + /* + * testSubJoin() + * SELECT * FROM joinCustomerTable c + * INNER JOIN (joinOrderTable o + * INNER JOIN (joinSupplierTable s + * RIGHT JOIN joinItemTable i ON i.supplier_id = s.supplier_id) + * ON o.item_id = i.item_id) + * ON c.customer_id = o.customer_id + * WHERE c.customer_id <= '0000000005' + * AND order_id != '000000000000003' + * AND i.name != 'T3' + * ORDER BY c.customer_id, i.name + */ + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + " [*] - ['0000000005']\n" + + " SERVER SORTED BY [\"C.customer_id\", I.NAME]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY \"order_id\" != '000000000000003'\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY NAME != 'T3'\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"C.customer_id\" IN (\"O.customer_id\")", + /* + * testJoinWithSubqueryAndAggregation() + * SELECT i.name, sum(quantity) FROM joinOrderTable o + * LEFT JOIN (SELECT name, item_id iid FROM joinItemTable) AS i + * ON o.item_id = i.iid + * GROUP BY i.name ORDER BY i.name + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME, + /* + * testJoinWithSubqueryAndAggregation() + * SELECT o.iid, sum(o.quantity) q + * FROM (SELECT item_id iid, quantity FROM joinOrderTable) AS o + * LEFT JOIN (SELECT item_id FROM joinItemTable) AS i + * ON o.iid = i.item_id + * GROUP BY o.iid ORDER BY q DESC + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [O.IID]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT SORTED BY [SUM(O.QUANTITY) DESC]\n" + + " PARALLEL LEFT-JOIN TABLE 0 (SKIP MERGE)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY FIRST KEY ONLY", + /* + * testJoinWithSubqueryAndAggregation() + * SELECT i.iid, o.q + * FROM (SELECT item_id iid FROM joinItemTable) AS i + * LEFT JOIN (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o + * ON o.iid = i.iid + * ORDER BY o.q DESC NULLS LAST, i.iid + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER SORTED BY [O.Q DESC NULLS LAST, I.IID]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"item_id\"]\n" + + " CLIENT MERGE SORT", + /* + * testJoinWithSubqueryAndAggregation() + * SELECT i.iid, o.q + * FROM (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o + * JOIN (SELECT item_id iid FROM joinItemTable) AS i + * ON o.iid = i.iid + * ORDER BY o.q DESC, i.iid + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER SORTED BY [O.Q DESC, I.IID]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER AGGREGATE INTO DISTINCT ROWS BY [\"item_id\"]\n" + + " CLIENT MERGE SORT", + /* + * testNestedSubqueries() + * SELECT * FROM (SELECT customer_id cid, name, phone, address, loc_id, date FROM joinCustomerTable) AS c + * INNER JOIN (SELECT o.oid ooid, o.cid ocid, o.iid oiid, o.price * o.quantity, o.date odate, + * qi.iiid iiid, qi.iname iname, qi.iprice iprice, qi.idiscount1 idiscount1, qi.idiscount2 idiscount2, qi.isid isid, qi.idescription idescription, + * qi.ssid ssid, qi.sname sname, qi.sphone sphone, qi.saddress saddress, qi.sloc_id sloc_id + * FROM (SELECT item_id iid, customer_id cid, order_id oid, price, quantity, date FROM joinOrderTable) AS o + * INNER JOIN (SELECT i.iid iiid, i.name iname, i.price iprice, i.discount1 idiscount1, i.discount2 idiscount2, i.sid isid, i.description idescription, + * s.sid ssid, s.name sname, s.phone sphone, s.address saddress, s.loc_id sloc_id + * FROM (SELECT supplier_id sid, name, phone, address, loc_id FROM joinSupplierTable) AS s + * RIGHT JOIN (SELECT item_id iid, name, price, discount1, discount2, supplier_id sid, description FROM joinItemTable) AS i + * ON i.sid = s.sid) as qi + * ON o.iid = qi.iiid) as qo + * ON c.cid = qo.ocid + * WHERE c.cid <= '0000000005' + * AND qo.ooid != '000000000000003' + * AND qo.iname != 'T3' + * ORDER BY c.cid, qo.iname + */ + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_CUSTOMER_TABLE_FULL_NAME + " [*] - ['0000000005']\n" + + " SERVER SORTED BY [C.CID, QO.INAME]\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY \"order_id\" != '000000000000003'\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " SERVER FILTER BY NAME != 'T3'\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME, + /* + * testJoinWithLimit() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" + + " SERVER 4 ROW LIMIT\n" + + "CLIENT 4 ROW LIMIT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithLimit() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4 + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" + + "CLIENT 4 ROW LIMIT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.supplier_id\")\n" + + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithSetMaxRows() + * statement.setMaxRows(4); + * SELECT order_id, i.name, quantity FROM joinItemTable i + * JOIN joinOrderTable o ON o.item_id = i.item_id; + * SELECT o.order_id, i.name, o.quantity FROM joinItemTable i + * JOIN (SELECT order_id, item_id, quantity FROM joinOrderTable) o + * ON o.item_id = i.item_id; + */ + "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + "\n" + + "CLIENT 4 ROW LIMIT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")\n" + + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithOffset() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" + + " SERVER OFFSET 2\n" + + " SERVER 3 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT\n" + + " PARALLEL LEFT-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " JOIN-SCANNER 3 ROW LIMIT", + /* + * testJoinWithOffset() + * SELECT order_id, i.name, s.name, s.address, quantity + * FROM joinSupplierTable s + * JOIN joinItemTable i ON i.supplier_id = s.supplier_id + * JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2 + */ + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_FULL_NAME + "\n" + + " SERVER OFFSET 2\n" + + "CLIENT 1 ROW LIMIT\n" + + " PARALLEL INNER-JOIN TABLE 0\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_FULL_NAME + "\n" + + " PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" + + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_FULL_NAME + "\n" + + " DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.supplier_id\")\n" + + " JOIN-SCANNER 3 ROW LIMIT", + }}); + return testCases; + } + + @Test + public void testInnerJoin() throws Exception { + // it involves sequences which may be incremented on re-try when hash + // cache is removed so this test may flap sometimes, so we don't need to + // test it for this case. + } + + @Test + public void testUpsertWithJoin() throws Exception { + // TODO: We will enable this test once PHOENIX-3163 + } + + public static class InvalidateHashCache extends SimpleRegionObserver { + public static Random rand= new Random(); + public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>(); + @Override + public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, + final RegionScanner s) throws IOException { + final HashJoinInfo joinInfo = HashJoinInfo.deserializeHashJoinFromScan(scan); + if (joinInfo != null) { + TenantCache cache = GlobalCache.getTenantCache(c.getEnvironment(), null); + int count = joinInfo.getJoinIds().length; + for (int i = 0; i < count; i++) { + ImmutableBytesPtr joinId = joinInfo.getJoinIds()[i]; + if (!ByteUtil.contains(lastRemovedJoinIds,joinId)) { + lastRemovedJoinIds.add(joinId); + cache.removeServerCache(joinId); + } + } + } + return s; + } + + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java index 7bdea5f..edab319 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; @@ -34,23 +35,44 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; - +import java.util.Random; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; @RunWith(Parameterized.class) public class HashJoinIT extends BaseJoinIT { + + public HashJoinIT(String[] indexDDL, String[] plans) { super(indexDDL, plans); } @@ -1166,8 +1188,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testDefaultJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\""; @@ -1213,8 +1234,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testInnerJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name, next value for " + seqName + " FROM " + tableName1 + " item INNER JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\""; @@ -1266,8 +1286,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testLeftJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); String query[] = new String[3]; @@ -1325,8 +1344,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testRightJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp RIGHT JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\""; @@ -1377,8 +1395,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testInnerJoinWithPreFilters() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item INNER JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005'"; @@ -1441,8 +1458,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testLeftJoinWithPreFilters() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " item LEFT JOIN " + tableName2 + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005') ORDER BY \"item_id\""; @@ -1493,8 +1509,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithPostFilters() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp RIGHT JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005'"; @@ -1557,8 +1572,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testStarJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME); String tableName3 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); @@ -1631,8 +1645,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testLeftJoinWithAggregation() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String query1 = "SELECT i.name, sum(quantity) FROM " + tableName1 + " o LEFT JOIN " @@ -1717,8 +1730,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testRightJoinWithAggregation() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String query1 = "SELECT i.name, sum(quantity) FROM " + tableName1 + " o RIGHT JOIN " @@ -1790,8 +1802,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testLeftRightJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String tableName1 = getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME); String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME); String tableName3 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME); @@ -1896,8 +1907,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testMultiLeftJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String[] queries = { "SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" LEFT JOIN " @@ -1949,8 +1959,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testMultiRightJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query = "SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC"; @@ -2025,6 +2034,7 @@ public class HashJoinIT extends BaseJoinIT { public void testMultiRightJoin_SmallChunkSize() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "1"); + props.put(ServerCacheClient.HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, "true"); Connection conn = DriverManager.getConnection(getUrl(), props); String query = "SELECT \"order_id\", i.name, s.name, quantity, \"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN " @@ -2096,8 +2106,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithWildcard() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query = "SELECT * FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ".\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\""; try { PreparedStatement statement = conn.prepareStatement(query); @@ -2205,8 +2214,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithTableWildcard() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query = "SELECT s.*, "+ getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ".*, \"order_id\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o RIGHT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC"; @@ -2351,8 +2359,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinMultiJoinKeys() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query = "SELECT c.name, s.name FROM " + getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME) + " c LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON \"customer_id\" = \"supplier_id\" AND c.loc_id = s.loc_id AND substr(s.name, 2, 1) = substr(c.name, 2, 1)"; try { PreparedStatement statement = conn.prepareStatement(query); @@ -2384,8 +2391,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithDifferentNumericJoinKeyTypes() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query = "SELECT \"order_id\", i.name, i.price, discount2, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" AND o.price = (i.price * (100 - discount2)) / 100.0 WHERE quantity < 5000"; try { @@ -2406,8 +2412,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithDifferentDateJoinKeyTypes() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query = "SELECT \"order_id\", c.name, o.\"DATE\" FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN " + getTableName(conn, JOIN_CUSTOMER_TABLE_FULL_NAME) + " c ON o.\"customer_id\" = c.\"customer_id\" AND o.\"DATE\" = c.\"DATE\""; try { @@ -2438,8 +2443,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithIncomparableJoinKeyTypes() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query = "SELECT \"order_id\", i.name, i.price, discount2, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o INNER JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON o.\"item_id\" = i.\"item_id\" AND o.price / 100 = substr(i.name, 2, 1)"; try { @@ -2455,8 +2459,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinPlanWithIndex() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query1 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item LEFT JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON substr(item.name, 2, 1) = substr(supp.name, 2, 1) AND (supp.name BETWEEN 'S1' AND 'S5') WHERE item.name BETWEEN 'T1' AND 'T5'"; String query2 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE (item.name = 'T1' OR item.name = 'T5') AND (supp.name = 'S1' OR supp.name = 'S5')"; try { @@ -2518,8 +2521,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithSkipMergeOptimization() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query = "SELECT s.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i JOIN " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" AND quantity < 5000 JOIN " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s ON i.\"supplier_id\" = s.\"supplier_id\""; @@ -2546,8 +2548,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testSelfJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query1 = "SELECT i2.\"item_id\", i1.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i1 JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i2 ON i1.\"item_id\" = i2.\"item_id\" ORDER BY i1.\"item_id\""; String query2 = "SELECT i1.name, i2.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i1 JOIN " @@ -2615,8 +2616,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testUpsertWithJoin() throws Exception { String tempTable = generateUniqueName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); conn.setAutoCommit(true); try { conn.createStatement().execute("CREATE TABLE " + tempTable @@ -2768,8 +2768,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testSubJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query1 = "SELECT i.name, count(c.name), min(s.name), max(quantity) FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN " + "(" + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s RIGHT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\")" + " ON o.\"item_id\" = i.\"item_id\" LEFT JOIN " @@ -2894,8 +2893,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithSubquery() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query1 = "SELECT item.\"item_id\", item.name, supp.sid, supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\" sid, name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " WHERE name BETWEEN 'S1' AND 'S5') AS supp ON item.\"supplier_id\" = supp.sid"; String query2 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + ") AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name = 'S1' OR supp.name = 'S5')"; try { @@ -2956,8 +2954,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithSubqueryPostFilters() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); try { String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " LIMIT 5) AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name != 'S1')"; PreparedStatement statement = conn.prepareStatement(query); @@ -3006,8 +3003,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithSubqueryAndAggregation() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query1 = "SELECT i.name, sum(quantity) FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o LEFT JOIN (SELECT name, \"item_id\" iid FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ") AS i ON o.\"item_id\" = i.iid GROUP BY i.name ORDER BY i.name"; String query2 = "SELECT o.iid, sum(o.quantity) q FROM (SELECT \"item_id\" iid, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + ") AS o LEFT JOIN (SELECT \"item_id\" FROM " @@ -3112,8 +3108,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testNestedSubqueries() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query1 = "SELECT q.iname, count(c.name), min(q.sname), max(o.quantity) FROM (SELECT \"customer_id\" cid, \"item_id\" iid, quantity FROM " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + ") AS o LEFT JOIN " + "(SELECT i.iid iid, s.name sname, i.name iname FROM (SELECT \"supplier_id\" sid, name FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + ") AS s RIGHT JOIN (SELECT \"item_id\" iid, name, \"supplier_id\" sid FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + ") AS i ON i.sid = s.sid) AS q" + " ON o.iid = q.iid LEFT JOIN (SELECT \"customer_id\" cid, name FROM " @@ -3235,8 +3230,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithLimit() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s LEFT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 4"; @@ -3314,8 +3308,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithOffset() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " s LEFT JOIN " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2 "; @@ -3357,8 +3350,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testNonEquiJoin() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); try { String query = "SELECT item.name, supp.name FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " item, " + getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME) + " supp WHERE item.\"supplier_id\" > supp.\"supplier_id\""; PreparedStatement statement = conn.prepareStatement(query); @@ -3414,8 +3406,7 @@ public class HashJoinIT extends BaseJoinIT { @Test public void testJoinWithSetMaxRows() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + Connection conn = getConnection(); String [] queries = new String[2]; queries[0] = "SELECT \"order_id\", i.name, quantity FROM " + getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME) + " i JOIN " + getTableName(conn, JOIN_ORDER_TABLE_FULL_NAME) + " o ON o.\"item_id\" = i.\"item_id\""; @@ -3453,4 +3444,5 @@ public class HashJoinIT extends BaseJoinIT { conn.close(); } } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 067f50f..12630f4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -690,7 +690,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); doneSignal.await(30, TimeUnit.SECONDS); // Install coprocessor that will simulate an index write failure during index rebuild - addWriteFailingCoprocessor(conn,fullIndexName); + TestUtil.addCoprocessor(conn,fullIndexName,WriteFailingRegionObserver.class); clock.time += WAIT_AFTER_DISABLED; doneSignal.await(30, TimeUnit.SECONDS); WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS); @@ -843,27 +843,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { t.start(); } - private static void addWriteFailingCoprocessor(Connection conn, String tableName) throws Exception { - int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100; - ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); - descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), null, priority, null); - int numTries = 10; - try (HBaseAdmin admin = services.getAdmin()) { - admin.modifyTable(Bytes.toBytes(tableName), descriptor); - while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor) - && numTries > 0) { - numTries--; - if (numTries == 0) { - throw new Exception( - "Check to detect if delaying co-processor was added failed after " - + numTries + " retries."); - } - Thread.sleep(1000); - } - } - } - private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception { ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java index 55bed91..047d067 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java @@ -18,14 +18,17 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@ -40,13 +43,13 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac @Override public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, - CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { - return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); + CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { + return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper, caches); } private class DelayedTableResultIterator extends TableResultIterator { - public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { - super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); + public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { + super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper,caches); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/6259055c/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 18e4034..ce46a3e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -21,6 +21,9 @@ import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INS import static org.apache.phoenix.util.LogUtil.addCustomAnnotations; import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; @@ -56,9 +59,10 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCac import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService; -import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; +import org.apache.phoenix.join.HashCacheFactory; +import org.apache.phoenix.memory.InsufficientMemoryException; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.query.ConnectionQueryServices; @@ -67,6 +71,7 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; @@ -86,6 +91,7 @@ public class ServerCacheClient { public static final byte[] KEY_IN_FIRST_REGION = new byte[]{0}; private static final Log LOG = LogFactory.getLog(ServerCacheClient.class); private static final Random RANDOM = new Random(); + public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER = "hash.join.server.cache.resend.per.server"; private final PhoenixConnection connection; private final Map<Integer, TableRef> cacheUsingTableRefMap = new ConcurrentHashMap<Integer, TableRef>(); @@ -115,12 +121,41 @@ public class ServerCacheClient { public class ServerCache implements SQLCloseable { private final int size; private final byte[] id; - private final ImmutableSet<HRegionLocation> servers; + private final Set<HRegionLocation> servers; + private ImmutableBytesWritable cachePtr; + private MemoryChunk chunk; + private File outputFile; - public ServerCache(byte[] id, Set<HRegionLocation> servers, int size) { + + public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr, + ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException { this.id = id; - this.servers = ImmutableSet.copyOf(servers); - this.size = size; + this.servers = new HashSet<HRegionLocation>(servers); + this.size = cachePtr.getLength(); + if (storeCacheOnClient) { + try { + this.chunk = services.getMemoryManager().allocate(cachePtr.getLength()); + this.cachePtr = cachePtr; + } catch (InsufficientMemoryException e) { + this.outputFile = File.createTempFile("HashJoinCacheSpooler", ".bin", new File(services.getProps() + .get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY))); + try (FileOutputStream fio = new FileOutputStream(outputFile)) { + fio.write(cachePtr.get(), cachePtr.getOffset(), cachePtr.getLength()); + } + } + } + + } + + public ImmutableBytesWritable getCachePtr() throws IOException { + if(this.outputFile!=null){ + try (FileInputStream fio = new FileInputStream(outputFile)) { + byte[] b = new byte[this.size]; + fio.read(b); + cachePtr = new ImmutableBytesWritable(b); + } + } + return cachePtr; } /** @@ -136,22 +171,41 @@ public class ServerCacheClient { public byte[] getId() { return id; } - + + public boolean addServer(HRegionLocation loc) { + return this.servers.add(loc); + } + /** * Call to free up cache on region servers when no longer needed */ @Override public void close() throws SQLException { - removeServerCache(id, servers); + try{ + removeServerCache(this, servers); + }finally{ + cachePtr = null; + if (chunk != null) { + chunk.close(); + } + if (outputFile != null) { + outputFile.delete(); + } + } } - + + } + + public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, + final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException { + return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTableRef, false); } - public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException { + public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, + final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef, boolean storeCacheOnClient) + throws SQLException { ConnectionQueryServices services = connection.getQueryServices(); - MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength()); List<Closeable> closeables = new ArrayList<Closeable>(); - closeables.add(chunk); ServerCache hashCacheSpec = null; SQLException firstException = null; final byte[] cacheId = generateId(); @@ -187,54 +241,7 @@ public class ServerCacheClient { @Override public Boolean call() throws Exception { - final Map<byte[], AddServerCacheResponse> results; - try { - results = htable.coprocessorService(ServerCachingService.class, key, key, - new Batch.Call<ServerCachingService, AddServerCacheResponse>() { - @Override - public AddServerCacheResponse call(ServerCachingService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<AddServerCacheResponse> rpcCallback = - new BlockingRpcCallback<AddServerCacheResponse>(); - AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder(); - final byte[] tenantIdBytes; - if(cacheUsingTable.isMultiTenant()) { - try { - tenantIdBytes = connection.getTenantId() == null ? null : - ScanUtil.getTenantIdBytes( - cacheUsingTable.getRowKeySchema(), - cacheUsingTable.getBucketNum() != null, - connection.getTenantId(), cacheUsingTable.getViewIndexId() != null); - } catch (SQLException e) { - throw new IOException(e); - } - } else { - tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); - } - if (tenantIdBytes != null) { - builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); - } - builder.setCacheId(ByteStringer.wrap(cacheId)); - builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr)); - builder.setHasProtoBufIndexMaintainer(true); - ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder(); - svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName()); - builder.setCacheFactory(svrCacheFactoryBuider.build()); - builder.setTxState(ByteStringer.wrap(txState)); - instance.addServerCache(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } - }); - } catch (Throwable t) { - throw new Exception(t); - } - if(results != null && results.size() == 1){ - return results.values().iterator().next().getReturn(); - } - return false; + return addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState); } /** @@ -257,7 +264,7 @@ public class ServerCacheClient { } } - hashCacheSpec = new ServerCache(cacheId,servers,cachePtr.getLength()); + hashCacheSpec = new ServerCache(cacheId,servers,cachePtr, services, storeCacheOnClient); // Execute in parallel int timeoutMs = services.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); for (Future<Boolean> future : futures) { @@ -303,73 +310,81 @@ public class ServerCacheClient { * @throws SQLException * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added */ - private void removeServerCache(final byte[] cacheId, Set<HRegionLocation> servers) throws SQLException { - ConnectionQueryServices services = connection.getQueryServices(); - Throwable lastThrowable = null; - TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId)); - final PTable cacheUsingTable = cacheUsingTableRef.getTable(); - byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes(); - HTableInterface iterateOverTable = services.getTable(tableName); - try { - List<HRegionLocation> locations = services.getAllTableRegions(tableName); - Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers); - /** - * Allow for the possibility that the region we based where to send our cache has split and been - * relocated to another region server *after* we sent it, but before we removed it. To accommodate - * this, we iterate through the current metadata boundaries and remove the cache once for each - * server that we originally sent to. - */ - if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection));} - for (HRegionLocation entry : locations) { - if (remainingOnServers.contains(entry)) { // Call once per server - try { + private void removeServerCache(final ServerCache cache, Set<HRegionLocation> remainingOnServers) throws SQLException { + HTableInterface iterateOverTable = null; + final byte[] cacheId = cache.getId(); + try { + ConnectionQueryServices services = connection.getQueryServices(); + Throwable lastThrowable = null; + TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId)); + final PTable cacheUsingTable = cacheUsingTableRef.getTable(); + byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes(); + iterateOverTable = services.getTable(tableName); + + List<HRegionLocation> locations = services.getAllTableRegions(tableName); + /** + * Allow for the possibility that the region we based where to send our cache has split and been relocated + * to another region server *after* we sent it, but before we removed it. To accommodate this, we iterate + * through the current metadata boundaries and remove the cache once for each server that we originally sent + * to. + */ + if (LOG.isDebugEnabled()) { + LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection)); + } + for (HRegionLocation entry : locations) { + // Call once per server + if (remainingOnServers.contains(entry)) { + try { byte[] key = getKeyInRegion(entry.getRegionInfo().getStartKey()); - iterateOverTable.coprocessorService(ServerCachingService.class, key, key, - new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() { - @Override - public RemoveServerCacheResponse call(ServerCachingService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback = - new BlockingRpcCallback<RemoveServerCacheResponse>(); - RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest.newBuilder(); - final byte[] tenantIdBytes; - if(cacheUsingTable.isMultiTenant()) { - try { - tenantIdBytes = connection.getTenantId() == null ? null : - ScanUtil.getTenantIdBytes( - cacheUsingTable.getRowKeySchema(), - cacheUsingTable.getBucketNum() != null, - connection.getTenantId(), cacheUsingTable.getViewIndexId() != null); - } catch (SQLException e) { - throw new IOException(e); + iterateOverTable.coprocessorService(ServerCachingService.class, key, key, + new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() { + @Override + public RemoveServerCacheResponse call(ServerCachingService instance) + throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback = new BlockingRpcCallback<RemoveServerCacheResponse>(); + RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest + .newBuilder(); + final byte[] tenantIdBytes; + if (cacheUsingTable.isMultiTenant()) { + try { + tenantIdBytes = connection.getTenantId() == null ? null + : ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(), + cacheUsingTable.getBucketNum() != null, + connection.getTenantId(), + cacheUsingTable.getViewIndexId() != null); + } catch (SQLException e) { + throw new IOException(e); + } + } else { + tenantIdBytes = connection.getTenantId() == null ? null + : connection.getTenantId().getBytes(); + } + if (tenantIdBytes != null) { + builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); + } + builder.setCacheId(ByteStringer.wrap(cacheId)); + instance.removeServerCache(controller, builder.build(), rpcCallback); + if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } + return rpcCallback.get(); } - } else { - tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); - } - if (tenantIdBytes != null) { - builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); - } - builder.setCacheId(ByteStringer.wrap(cacheId)); - instance.removeServerCache(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } - }); - remainingOnServers.remove(entry); - } catch (Throwable t) { - lastThrowable = t; - LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, connection), t); - } - } - } - if (!remainingOnServers.isEmpty()) { - LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, connection), lastThrowable); - } - } finally { - Closeables.closeQuietly(iterateOverTable); - } + }); + remainingOnServers.remove(entry); + } catch (Throwable t) { + lastThrowable = t; + LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, connection), + t); + } + } + } + if (!remainingOnServers.isEmpty()) { + LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, connection), + lastThrowable); + } + } finally { + cacheUsingTableRefMap.remove(cacheId); + Closeables.closeQuietly(iterateOverTable); + } } /** @@ -394,4 +409,77 @@ public class ServerCacheClient { } return regionStartKey; } + + public boolean addServerCache(byte[] startkeyOfRegion, ServerCache cache, HashCacheFactory cacheFactory, + byte[] txState, PTable pTable) throws Exception { + HTableInterface table = null; + boolean success = true; + byte[] cacheId = cache.getId(); + try { + ConnectionQueryServices services = connection.getQueryServices(); + + byte[] tableName = pTable.getPhysicalName().getBytes(); + table = services.getTable(tableName); + HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion); + if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false)) { + success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory, + txState); + } + return success; + } finally { + Closeables.closeQuietly(table); + } + } + + public boolean addServerCache(HTableInterface htable, byte[] key, final PTable cacheUsingTable, final byte[] cacheId, + final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState) + throws Exception { + byte[] keyInRegion = getKeyInRegion(key); + final Map<byte[], AddServerCacheResponse> results; + try { + results = htable.coprocessorService(ServerCachingService.class, keyInRegion, keyInRegion, + new Batch.Call<ServerCachingService, AddServerCacheResponse>() { + @Override + public AddServerCacheResponse call(ServerCachingService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<AddServerCacheResponse> rpcCallback = new BlockingRpcCallback<AddServerCacheResponse>(); + AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder(); + final byte[] tenantIdBytes; + if (cacheUsingTable.isMultiTenant()) { + try { + tenantIdBytes = connection.getTenantId() == null ? null + : ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(), + cacheUsingTable.getBucketNum() != null, connection.getTenantId(), + cacheUsingTable.getViewIndexId() != null); + } catch (SQLException e) { + throw new IOException(e); + } + } else { + tenantIdBytes = connection.getTenantId() == null ? null + : connection.getTenantId().getBytes(); + } + if (tenantIdBytes != null) { + builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); + } + builder.setCacheId(ByteStringer.wrap(cacheId)); + builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr)); + builder.setHasProtoBufIndexMaintainer(true); + ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory + .newBuilder(); + svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName()); + builder.setCacheFactory(svrCacheFactoryBuider.build()); + builder.setTxState(ByteStringer.wrap(txState)); + instance.addServerCache(controller, builder.build(), rpcCallback); + if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } + return rpcCallback.get(); + } + }); + } catch (Throwable t) { + throw new Exception(t); + } + if (results != null && results.size() == 1) { return results.values().iterator().next().getReturn(); } + return false; + + } + }
