This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 5bcd1ce PHOENIX-5757 IndexUpgrade tool set Phoenix query and Hbase rpc timeouts 5bcd1ce is described below commit 5bcd1ce19d6eda842d933c436623e329e2859e70 Author: Tanuj Khurana <khurana.ta...@gmail.com> AuthorDate: Tue Mar 3 18:19:12 2020 -0800 PHOENIX-5757 IndexUpgrade tool set Phoenix query and Hbase rpc timeouts Signed-off-by: s.kadam <s.ka...@apache.org> --- .../phoenix/mapreduce/index/IndexUpgradeTool.java | 43 ++++++++++-- .../apache/phoenix/index/IndexUpgradeToolTest.java | 79 ++++++++++++++++++++++ 2 files changed, 116 insertions(+), 6 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java index ccf5a5f..7ed24dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java @@ -27,6 +27,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; @@ -46,6 +47,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; @@ -314,12 +316,39 @@ public class IndexUpgradeTool extends Configured implements Tool { } } + private static void setRpcRetriesAndTimeouts(Configuration conf) { + long indexRebuildQueryTimeoutMs = + conf.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT); + long indexRebuildRPCTimeoutMs = + conf.getLong(QueryServices.INDEX_REBUILD_RPC_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_TIMEOUT); + long indexRebuildClientScannerTimeOutMs = + conf.getLong(QueryServices.INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT); + int indexRebuildRpcRetriesCounter = + conf.getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER); + + // Set phoenix and hbase level timeouts and rpc retries + conf.setLong(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, indexRebuildQueryTimeoutMs); + conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, indexRebuildRPCTimeoutMs); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + indexRebuildClientScannerTimeOutMs); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, indexRebuildRpcRetriesCounter); + } + + @VisibleForTesting + public static Connection getConnection(Configuration conf) throws SQLException { + setRpcRetriesAndTimeouts(conf); + return ConnectionUtil.getInputConnection(conf); + } + @VisibleForTesting public int executeTool() { Configuration conf = HBaseConfiguration.addHbaseResources(getConf()); - try (Connection conn = ConnectionUtil.getInputConnection(conf)) { - + try (Connection conn = getConnection(conf)) { ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class) .getQueryServices(); @@ -785,9 +814,10 @@ public class IndexUpgradeTool extends Configured implements Tool { String viewIndexesSql = getViewIndexesSql(viewName, schemaName, tenantId); ArrayList<String> viewIndexes = new ArrayList<>(); - ResultSet - rs = - conn.createStatement().executeQuery(viewIndexesSql); + long stime = EnvironmentEdgeManager.currentTimeMillis(); + ResultSet rs = conn.createStatement().executeQuery(viewIndexesSql); + long etime = EnvironmentEdgeManager.currentTimeMillis(); + LOGGER.info(String.format("Query %s took %d ms ", viewIndexesSql, (etime - stime))); while(rs.next()) { String viewIndexName = rs.getString(1); viewIndexes.add(viewIndexName); @@ -803,7 +833,8 @@ public class IndexUpgradeTool extends Configured implements Tool { + (!Strings.isNullOrEmpty(schemaName) ? " AND TABLE_SCHEM = \'" + schemaName + "\'" : "") + " AND LINK_TYPE = " + PTable.LinkType.INDEX_TABLE.getSerializedValue() - + (tenantId != null ? " AND TENANT_ID = \'" + tenantId + "\'" : ""); + + (tenantId != null ? + " AND TENANT_ID = \'" + tenantId + "\'" : " AND TENANT_ID IS NULL"); } private class IndexInfo { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java index facd9ee..9554aff 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java @@ -20,13 +20,22 @@ package org.apache.phoenix.index; import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP; import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP; +import java.sql.Connection; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.UUID; import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; + import org.apache.phoenix.mapreduce.index.IndexUpgradeTool; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.PhoenixRuntime; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -34,6 +43,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; + @RunWith(Parameterized.class) public class IndexUpgradeToolTest { private static final String INPUT_LIST = "TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3"; @@ -86,4 +96,73 @@ public class IndexUpgradeToolTest { return Arrays.asList( false, true); } + private void setupConfForConnectionlessQuery(Configuration conf) { + conf.set(HConstants.ZOOKEEPER_QUORUM, PhoenixRuntime.CONNECTIONLESS); + conf.unset(HConstants.ZOOKEEPER_CLIENT_PORT); + conf.unset(HConstants.ZOOKEEPER_ZNODE_PARENT); + } + + @Test + public void testConnectionProperties() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + long indexRebuildQueryTimeoutMs = 2000; + long indexRebuildRpcTimeoutMs = 3000; + long indexRebuildClientScannerTimeoutMs = 4000; + int indexRebuildRpcRetryCount = 10; + + conf.setLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB, indexRebuildQueryTimeoutMs); + conf.setLong(QueryServices.INDEX_REBUILD_RPC_TIMEOUT_ATTRIB, indexRebuildRpcTimeoutMs); + conf.setLong(QueryServices.INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT_ATTRIB, + indexRebuildClientScannerTimeoutMs); + conf.setInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, indexRebuildRpcRetryCount); + + // prepare conf for connectionless query + setupConfForConnectionlessQuery(conf); + + try (Connection conn = IndexUpgradeTool.getConnection(conf)) { + // verify connection properties for phoenix, hbase timeouts and retries + Assert.assertEquals(conn.getClientInfo(QueryServices.THREAD_TIMEOUT_MS_ATTRIB), + Long.toString(indexRebuildQueryTimeoutMs)); + Assert.assertEquals(conn.getClientInfo(HConstants.HBASE_RPC_TIMEOUT_KEY), + Long.toString(indexRebuildRpcTimeoutMs)); + Assert.assertEquals(conn.getClientInfo(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD), + Long.toString(indexRebuildClientScannerTimeoutMs)); + Assert.assertEquals(conn.getClientInfo(HConstants.HBASE_CLIENT_RETRIES_NUMBER), + Long.toString(indexRebuildRpcRetryCount)); + } + } + + @Test + public void testConnectionDefaults() throws Exception { + Configuration conf = HBaseConfiguration.create(); + + long indexRebuildQueryTimeoutMs = conf.getLong( + QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT); + long indexRebuildRpcTimeoutMs = conf.getLong( + QueryServices.INDEX_REBUILD_RPC_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_TIMEOUT); + long indexRebuildClientScannerTimeoutMs = conf.getLong( + QueryServices.INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT); + long indexRebuildRpcRetryCount = conf.getInt( + QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER); + + // prepare conf for connectionless query + setupConfForConnectionlessQuery(conf); + + try (Connection conn = IndexUpgradeTool.getConnection(conf)) { + // verify connection properties for phoenix, hbase timeouts and retries + Assert.assertEquals(conn.getClientInfo(QueryServices.THREAD_TIMEOUT_MS_ATTRIB), + Long.toString(indexRebuildQueryTimeoutMs)); + Assert.assertEquals(conn.getClientInfo(HConstants.HBASE_RPC_TIMEOUT_KEY), + Long.toString(indexRebuildRpcTimeoutMs)); + Assert.assertEquals(conn.getClientInfo(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD), + Long.toString(indexRebuildClientScannerTimeoutMs)); + Assert.assertEquals(conn.getClientInfo(HConstants.HBASE_CLIENT_RETRIES_NUMBER), + Long.toString(indexRebuildRpcRetryCount)); + } + } }