Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 cdc77dfcb -> 861c30402
PHOENIX-3584 Expose metrics for ConnectionQueryServices instances and their allocators in the JVM Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/861c3040 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/861c3040 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/861c3040 Branch: refs/heads/4.x-HBase-1.1 Commit: 861c30402abb56d2ff765a924dc77f6efae97cab Parents: cdc77df Author: Samarth <samarth.j...@salesforce.com> Authored: Tue Jan 10 17:36:17 2017 -0800 Committer: Samarth <samarth.j...@salesforce.com> Committed: Tue Jan 10 17:36:17 2017 -0800 ---------------------------------------------------------------------- .../phoenix/monitoring/PhoenixMetricsIT.java | 135 +++++++++++++++++++ .../phoenix/monitoring/GlobalClientMetrics.java | 6 +- .../apache/phoenix/monitoring/MetricType.java | 4 +- .../query/ConnectionQueryServicesImpl.java | 10 ++ 4 files changed, 153 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/861c3040/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index 3af8ce7..16a66df 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -10,12 +10,14 @@ package org.apache.phoenix.monitoring; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER; @@ -28,6 +30,7 @@ import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; +import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -44,10 +47,16 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; @@ -67,6 +76,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { .newArrayList(MetricType.MUTATION_COMMIT_TIME.name()); private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(), MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name()); + private static final String CUSTOM_URL_STRING = "SESSION"; + private static final AtomicInteger numConnections = new AtomicInteger(0); @BeforeClass public static void doSetup() throws Exception { @@ -76,6 +87,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { // disable renewing leases as this will force spooling to happen. props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + // need the non-test driver for some tests that check number of hconnections, etc. + DriverManager.registerDriver(PhoenixDriver.INSTANCE); } @Test @@ -827,5 +840,127 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { } } } + + @Test + public void testGetConnectionsForSameUrlConcurrently() throws Exception { + // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver + String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort(); + String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; + ExecutorService exec = Executors.newFixedThreadPool(10); + try { + GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset(); + GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset(); + assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue()); + assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue()); + List<Callable<Connection>> callables = new ArrayList<>(100); + List<Future<Connection>> futures = new ArrayList<>(100); + int expectedHConnections = numConnections.get() > 0 ? 0 : 1; + for (int i = 1; i <= 100; i++) { + Callable<Connection> c = new GetConnectionCallable(url); + callables.add(c); + futures.add(exec.submit(c)); + } + for (int i = 0; i < futures.size(); i++) { + Connection c = futures.get(i).get(); + try { + c.close(); + } catch (Exception ignore) {} + } + assertEquals(expectedHConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue()); + assertEquals(expectedHConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue()); + } finally { + exec.shutdownNow(); + } + } + + @Test + public void testGetConnectionsForDifferentTenantsConcurrently() throws Exception { + // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver + String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort(); + String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; + ExecutorService exec = Executors.newFixedThreadPool(10); + try { + GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset(); + GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset(); + assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue()); + assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue()); + int expectedHConnections = numConnections.get() > 0 ? 0 : 1; + List<Callable<Connection>> callables = new ArrayList<>(100); + List<Future<Connection>> futures = new ArrayList<>(100); + for (int i = 1; i <= 100; i++) { + String tenantUrl = url + ';' + TENANT_ID_ATTRIB + '=' + i; + Callable<Connection> c = new GetConnectionCallable(tenantUrl + ";"); + callables.add(c); + futures.add(exec.submit(c)); + } + for (int i = 0; i < futures.size(); i++) { + Connection c = futures.get(i).get(); + try { + c.close(); + } catch (Exception ignore) {} + } + assertEquals(expectedHConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue()); + assertEquals(expectedHConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue()); + } finally { + exec.shutdownNow(); + } + } + + @Test + public void testGetConnectionsWithDifferentJDBCParamsConcurrently() throws Exception { + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + ExecutorService exec = Executors.newFixedThreadPool(4); + // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver + String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort(); + String baseUrl = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; + int numConnections = 20; + List<Callable<Connection>> callables = new ArrayList<>(numConnections); + List<Future<Connection>> futures = new ArrayList<>(numConnections); + try { + GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset(); + GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset(); + assertEquals(0, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue()); + assertEquals(0, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue()); + for (int i = 1; i <= numConnections; i++) { + String customUrl = baseUrl + ':' + CUSTOM_URL_STRING + '=' + i; + Callable<Connection> c = new GetConnectionCallable(customUrl + ";"); + callables.add(c); + futures.add(exec.submit(c)); + } + for (int i = 0; i < futures.size(); i++) { + futures.get(i).get(); + } + assertEquals(numConnections, GLOBAL_HCONNECTIONS_COUNTER.getMetric().getValue()); + assertEquals(numConnections, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue()); + } finally { + exec.shutdownNow(); + for (int i = 0; i < futures.size(); i++) { + try { + Connection c = futures.get(i).get(); + // close the query services instance because we created a lot of HConnections. + c.unwrap(PhoenixConnection.class).getQueryServices().close(); + c.close(); + } catch (Exception ignore) {} + } + } + } + + private static class GetConnectionCallable implements Callable<Connection> { + private final String url; + GetConnectionCallable(String url) { + this.url = url; + } + @Override + public Connection call() throws Exception { + Connection c = DriverManager.getConnection(url); + if (!url.contains(CUSTOM_URL_STRING)) { + // check to detect whether a connection was established using the PhoenixDriver + // This is used in our tests to figure out whether a new hconnection and query + // services will be created. + numConnections.incrementAndGet(); + } + return c; + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/861c3040/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index c3a7261..fab4d27 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.monitoring; +import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; @@ -26,6 +27,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES; import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME; import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER; import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS; +import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME; import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER; import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER; @@ -73,7 +75,9 @@ public enum GlobalClientMetrics { GLOBAL_QUERY_TIMEOUT_COUNTER(QUERY_TIMEOUT_COUNTER), GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER), GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER), - GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER); + GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER), + GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER), + GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER); private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled(); private GlobalMetric metric; http://git-wip-us.apache.org/repos/asf/phoenix/blob/861c3040/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index 6cfe977..b420b75 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -41,7 +41,9 @@ public enum MetricType { CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"), WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"), RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()"), - OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections"); + OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections"), + QUERY_SERVICES_COUNTER("Number of ConnectionQueryServicesImpl instantiated"), + HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"); private final String description; http://git-wip-us.apache.org/repos/asf/phoenix/blob/861c3040/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 21acd5b..854fac5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -26,6 +26,8 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE; @@ -151,6 +153,8 @@ import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.monitoring.GlobalClientMetrics; +import org.apache.phoenix.monitoring.GlobalMetric; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.protobuf.ProtobufUtil; @@ -392,6 +396,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED); this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); + GLOBAL_HCONNECTIONS_COUNTER.increment(); + logger.info("HConnnection established. Details: " + connection + " " + Throwables.getStackTraceAsString(new Exception())); // only initialize the tx service client if needed and if we succeeded in getting a connection // to HBase if (transactionsEnabled) { @@ -457,6 +463,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return; } closed = true; + GLOBAL_QUERY_SERVICES_COUNTER.decrement(); SQLException sqlE = null; try { // Attempt to return any unused sequences. @@ -473,6 +480,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { // close the HBase connection if (connection != null) connection.close(); + GLOBAL_HCONNECTIONS_COUNTER.decrement(); } finally { if (renewLeaseExecutor != null) { renewLeaseExecutor.shutdownNow(); @@ -2360,6 +2368,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement boolean hConnectionEstablished = false; boolean success = false; try { + GLOBAL_QUERY_SERVICES_COUNTER.increment(); + logger.info("An instance of ConnectionQueryServices was created: " + Throwables.getStackTraceAsString(new Exception())); openConnection(); hConnectionEstablished = true; boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props);