This is an automated email from the ASF dual-hosted git repository. mihir6692 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 5d5aaca213 PHOENIX-7038 : Implement Connection Query Service Metrics (#1682) 5d5aaca213 is described below commit 5d5aaca21397f91b9f3fa682cd06da7798d9fc12 Author: Monani Mihir <monani.mi...@gmail.com> AuthorDate: Wed Nov 8 18:17:58 2023 -0800 PHOENIX-7038 : Implement Connection Query Service Metrics (#1682) * PHOENIX-7038 Implement Connection Query Service Metrics * build fix * Get connection count values from ConnectionLimiter.java * CheckStyle length fixes * Checkstyle fixed * NoOpConnectionQueryServicesMetricsManager when it's disabled * Test fix * Spotbugs and checkstyle fixes * Update PhoenixRuntime.java --------- Co-authored-by: Mihir Monani <mihir6...@apache.org> --- .../end2end/RebuildIndexConnectionPropsIT.java | 4 +- .../ConnectionQueryServicesMetricsIT.java | 366 +++++++++++++++++++++ .../org/apache/phoenix/jdbc/PhoenixConnection.java | 53 ++- .../apache/phoenix/log/BaseConnectionLimiter.java | 15 +- .../org/apache/phoenix/log/ConnectionLimiter.java | 4 + .../apache/phoenix/monitoring/AtomicMetric.java | 10 + .../phoenix/monitoring/CombinableMetric.java | 8 + .../phoenix/monitoring/CombinableMetricImpl.java | 10 + .../ConnectionQueryServicesMetric.java} | 29 +- ...java => ConnectionQueryServicesMetricImpl.java} | 25 +- .../phoenix/monitoring/GlobalMetricImpl.java | 10 + .../java/org/apache/phoenix/monitoring/Metric.java | 4 + .../phoenix/monitoring/NoOpGlobalMetricImpl.java | 10 + .../apache/phoenix/monitoring/NonAtomicMetric.java | 10 + .../phoenix/monitoring/PhoenixTableMetricImpl.java | 10 + .../ConnectionQueryServicesHistogram.java | 43 +++ .../ConnectionQueryServicesMetrics.java | 120 +++++++ .../ConnectionQueryServicesMetricsHistograms.java | 71 ++++ .../ConnectionQueryServicesMetricsManager.java | 343 +++++++++++++++++++ .../NoOpConnectionQueryServicesMetricsManager.java | 62 ++++ .../phoenix/query/ConnectionQueryServices.java | 1 + .../phoenix/query/ConnectionQueryServicesImpl.java | 15 +- .../query/ConnectionlessQueryServicesImpl.java | 5 + .../query/DelegateConnectionQueryServices.java | 5 + .../org/apache/phoenix/query/QueryServices.java | 12 + .../apache/phoenix/query/QueryServicesOptions.java | 43 +++ .../org/apache/phoenix/util/PhoenixRuntime.java | 18 + .../ConnectionQueryServicesHistogramTest.java | 73 ++++ ...nnectionQueryServicesMetricsHistogramsTest.java | 37 +++ .../ConnectionQueryServicesMetricsManagerTest.java | 112 +++++++ .../ConnectionQueryServicesMetricsTest.java | 106 ++++++ .../ConnectionQueryServicesNameMetricsTest.java | 87 +++++ 32 files changed, 1685 insertions(+), 36 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java index 75ebfa8c35..a7000b69ce 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RebuildIndexConnectionPropsIT.java @@ -107,8 +107,8 @@ public class RebuildIndexConnectionPropsIT extends BaseTest { rebuildQueryServicesConfig.get(HConstants.HBASE_CLIENT_RETRIES_NUMBER)); ConnectionQueryServices rebuildQueryServices = rebuildIndexConnection.getQueryServices(); Connection rebuildIndexHConnection = - (Connection) Whitebox.getInternalState(rebuildQueryServices, - "connection"); + (Connection) Whitebox.getInternalState(Whitebox.getInternalState(rebuildQueryServices, + "parent"), "connection"); Connection regularHConnection = (Connection) Whitebox.getInternalState( regularConnection.getQueryServices(), "connection"); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java new file mode 100644 index 0000000000..f7d065ec93 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; +import org.apache.phoenix.monitoring.HistogramDistribution; +import org.apache.phoenix.monitoring.Metric; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.ConfigurationFactory; +import org.apache.phoenix.query.HBaseFactoryProvider; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.InstanceResolver; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER; +import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED; +import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME; +import static org.apache.phoenix.util.PhoenixRuntime.clearAllConnectionQueryServiceMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category(NeedsOwnMiniClusterTest.class) +public class ConnectionQueryServicesMetricsIT extends BaseTest { + private static final Logger LOGGER = + LoggerFactory.getLogger(ConnectionQueryServicesMetricsIT.class); + private AtomicInteger counter = new AtomicInteger(); + private static HBaseTestingUtility hbaseTestUtil; + private String tableName; + private static final String CONN_QUERY_SERVICE_1 = "CONN_QUERY_SERVICE_1"; + private static final String + CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE = "CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE"; + private static final String CONN_QUERY_SERVICE_2 = "CONN_QUERY_SERVICE_2"; + private static final String CONN_QUERY_SERVICE_NULL = null; + private enum CompareOp { + LT, EQ, GT, LTEQ, GTEQ + } + + @BeforeClass + public static void doSetup() throws Exception { + InstanceResolver.clearSingletons(); + // Override to get required config for static fields loaded that require HBase config + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + + @Override public Configuration getConfiguration() { + Configuration conf = HBaseConfiguration.create(); + conf.set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, String.valueOf(true)); + // Without this config, unlimited connections are allowed from client and connection + // counter won't be increased at all. So we need to set max allowed connection count + conf.set(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "2"); + conf.set(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, "1"); + return conf; + } + + @Override public Configuration getConfiguration(Configuration confToClone) { + Configuration conf = HBaseConfiguration.create(); + conf.set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, String.valueOf(true)); + // Without this config, unlimited connections are allowed from client and connection + // counter won't be increased at all. So we need to set max allowed connection count + conf.set(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "2"); + conf.set(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, "1"); + Configuration copy = new Configuration(conf); + copy.addResource(confToClone); + return copy; + } + }); + Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + hbaseTestUtil = new HBaseTestingUtility(conf); + setUpConfigForMiniCluster(conf); + conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + hbaseTestUtil.startMiniCluster(); + // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver + String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownMiniCluster() { + try { + if (hbaseTestUtil != null) { + hbaseTestUtil.shutdownMiniCluster(); + } + } catch (Exception e) { + // ignore + } + } + + @Before + public void resetTableLevelMetrics() { + clearAllConnectionQueryServiceMetrics(); + tableName = generateUniqueName(); + } + + @After + public void cleanUp() { + clearAllConnectionQueryServiceMetrics(); + } + + private String connUrlWithPrincipal(String principalName) { + return url + (principalName == null ? "" : PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + principalName); + } + + @Test + public void testMultipleCQSIMetricsInParallel() throws Exception { + Thread csqi1 = new Thread(() -> { + try { + checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_1); + // Increment counter for successful check + counter.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + Thread csqi2 = new Thread(() -> { + try { + // We have set limit of 2 for phoenix connection counter in doSetup() function. + // For this one, we will create more than 2 connections and + // test that Connection Throttle Count Metric is also working as expected. + checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE); + } catch (Exception e) { + e.printStackTrace(); + if(!e.getMessage().equals("This should not be thrown for " + + CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) { + // Increment counter for successful check. For this Connection Query Service, + // code will throw error since it will try to create more than 2 connections. + // So we would count exception as success here and increment the counter. + counter.incrementAndGet(); + } + } + }); + Thread csqi3 = new Thread(() -> { + try { + checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_2); + // Increment counter for successful check + counter.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + Thread csqi4 = new Thread(() -> { + try { + // Test default CQS name + checkConnectionQueryServiceMetricsValues(CONN_QUERY_SERVICE_NULL); + // Increment counter for successful check + counter.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + // Start Single Query Service Test + csqi1.start(); + csqi1.join(); + + // Start 3 Query Service Test in parallel + csqi2.start(); + csqi3.start(); + csqi4.start(); + csqi2.join(); + csqi3.join(); + csqi4.join(); + + // Check If all CSQI Metric check passed or not + assertEquals("Number of passing CSQI Metrics check should be : ",4, counter.get()); + } + + private void checkConnectionQueryServiceMetricsValues( + String queryServiceName) throws Exception { + String CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (K VARCHAR(10) NOT NULL" + + " PRIMARY KEY, V VARCHAR)"; + String princURL = connUrlWithPrincipal(queryServiceName); + LOGGER.info("Connection Query Service : " + queryServiceName + " URL : " + princURL); + + String connQueryServiceName; + try (Connection conn = DriverManager.getConnection(princURL); + Statement stmt = conn.createStatement()) { + connQueryServiceName = conn.unwrap(PhoenixConnection.class).getQueryServices() + .getConfiguration().get(QUERY_SERVICES_NAME); + // When queryServiceName is passed as null, Phoenix will change query service name + // to DEFAULT_CQSN. That's why we are re-assigning the query service name here to check + // metric in finally block. + queryServiceName = connQueryServiceName; + stmt.execute(String.format(CREATE_TABLE_DDL, tableName + "_" + connQueryServiceName)); + if (connQueryServiceName.equals(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) { + try(Connection conn1 = DriverManager.getConnection(princURL)) { + assertMetricValues(connQueryServiceName, 2, 0, 0); + assertHistogramMetricsForMutations(connQueryServiceName, 2, 0, 0, + 0); + try(Connection conn2 = DriverManager.getConnection(princURL)) { + // This should never execute in this test. + throw new RuntimeException("This should not be thrown for " + + CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE); + } + } + } else { + // We only create one connection so, + // Open Connection Count : 1 + // Open Internal Connection Count : 0 + // Connection Throttled Count : 0 + assertMetricValues(connQueryServiceName, 1, 0, 0); + assertHistogramMetricsForMutations(connQueryServiceName, 1, 0, 0, 0); + } + } catch (Exception e) { + e.printStackTrace(); + throw e; + } finally { + if (queryServiceName.equals(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) { + // We have closed the connection in try-resource-catch block so, + // Open Connection Count : 0 + // Connection Throttled Count : 1 + // Open Internal Connection Count : 0 + assertMetricValues(queryServiceName, 0, 1, 0); + // In histogram, we will still have max open connection count as 2 + // while rest of the values will be 0. + assertHistogramMetricsForMutations(queryServiceName, 2, 0, 0, 0); + } else { + // We have closed the connection in try-resource-catch block so, + // Open Connection Count : 0 + // Connection Throttled Count : 0 + // Open Internal Connection Count : 0 + assertMetricValues(queryServiceName, 0, 0, 0); + // In histogram, we will still have max open connection count as 1 while rest of the values will be 0. + assertHistogramMetricsForMutations(queryServiceName, 1, 0, 0, 0); + } + } + } + + /** + * check min/max value in histogram + * @param queryServiceName Connection Query Service Name + * @param oMaxValue Max value of {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER} + * @param oMinValue Min value of {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER} + * @param ioMaxValue Max value of {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER} + * @param ioMinValue Min value of {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER} + */ + private void assertHistogramMetricsForMutations( + String queryServiceName, int oMaxValue, int oMinValue, int ioMaxValue, int ioMinValue) { + Map<String, List<HistogramDistribution>> listOfHistoDistribution = + PhoenixRuntime.getAllConnectionQueryServicesHistograms(); + for(HistogramDistribution histo : listOfHistoDistribution.get(queryServiceName)) { + assertHistogram(histo, "PhoenixInternalOpenConn", ioMaxValue, ioMinValue, + CompareOp.EQ); + assertHistogram(histo, "PhoenixOpenConn", oMaxValue, oMinValue, CompareOp.EQ); + } + } + + public void assertHistogram(HistogramDistribution histo, String histoName, long maxValue, + long minValue, CompareOp op) { + if (histo.getHistoName().equals(histoName)) { + switch (op) { + case EQ: + assertEquals(maxValue, histo.getMax()); + assertEquals(minValue, histo.getMin()); + break; + } + } + } + + /** + * check metric value for connection query service + * @param queryServiceName Connection Query Service Name + * @param o {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER} + * @param ct {@link MetricType#PHOENIX_CONNECTIONS_THROTTLED_COUNTER} + * @param io {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER} + */ + public void assertMetricValues(String queryServiceName, int o, int ct, int io) { + Map<String, List<ConnectionQueryServicesMetric>> listOfMetrics = + PhoenixRuntime.getAllConnectionQueryServicesCounters(); + /* + There are 3 metrics which are tracked as part of Phoenix Connection Query Service Metrics. + Defined here : {@link ConnectionQueryServicesMetrics.QueryServiceMetrics} + OPEN_PHOENIX_CONNECTIONS_COUNTER + OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER + PHOENIX_CONNECTIONS_THROTTLED_COUNTER + */ + assertEquals(3, listOfMetrics.get(queryServiceName).size()); + for (ConnectionQueryServicesMetric metric : listOfMetrics.get(queryServiceName)) { + assertMetricValue(metric, OPEN_PHOENIX_CONNECTIONS_COUNTER, o, CompareOp.EQ); + assertMetricValue(metric, PHOENIX_CONNECTIONS_THROTTLED_COUNTER, ct, CompareOp.EQ); + assertMetricValue(metric, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, io, CompareOp.EQ); + } + } + + /** + * Check if metrics collection is empty. + */ + public void assertMetricListIsEmpty() { + Map<String, List<ConnectionQueryServicesMetric>> listOfMetrics = + PhoenixRuntime.getAllConnectionQueryServicesCounters(); + assertTrue(listOfMetrics.isEmpty()); + } + + /** + * Checks that if the metric is of the passed in type, it has the expected value + * (based on the CompareOp). If the metric type is different than checkType, ignore + * @param m metric to check + * @param checkType type to check for + * @param compareValue value to compare against + * @param op CompareOp + */ + private static void assertMetricValue(Metric m, MetricType checkType, long compareValue, + CompareOp op) { + if (m.getMetricType().equals(checkType)) { + switch (op) { + case EQ: + assertEquals(compareValue, m.getValue()); + break; + case LT: + assertTrue(m.getValue() < compareValue); + break; + case LTEQ: + assertTrue(m.getValue() <= compareValue); + break; + case GT: + assertTrue(m.getValue() > compareValue); + break; + case GTEQ: + assertTrue(m.getValue() >= compareValue); + break; + } + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 493fdf5037..3c078d6ecc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -17,13 +17,13 @@ */ package org.apache.phoenix.jdbc; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; +import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME; import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull; import static java.util.Collections.emptyMap; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_PHOENIX_CONNECTIONS; import java.io.EOFException; import java.io.IOException; @@ -88,6 +88,7 @@ import org.apache.phoenix.log.ConnectionActivityLogger; import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.TableMetricsManager; +import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.query.ConnectionQueryServices; @@ -398,13 +399,29 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE, QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE)); - if (isInternalConnection) { - GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment(); - } else { - GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); + String connectionQueryServiceName = + this.services.getConfiguration().get(QUERY_SERVICES_NAME); + if (isInternalConnection) { + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment(); + long currentInternalConnectionCount = + this.getQueryServices().getConnectionCount(isInternalConnection); + ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName, + OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, currentInternalConnectionCount); + ConnectionQueryServicesMetricsManager + .updateConnectionQueryServiceOpenInternalConnectionHistogram( + currentInternalConnectionCount, connectionQueryServiceName); + } else { + GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); + long currentConnectionCount = + this.getQueryServices().getConnectionCount(isInternalConnection); + ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName, + OPEN_PHOENIX_CONNECTIONS_COUNTER, currentConnectionCount); + ConnectionQueryServicesMetricsManager + .updateConnectionQueryServiceOpenConnectionHistogram(currentConnectionCount, + connectionQueryServiceName); } - this.sourceOfOperation = - this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null); + this.sourceOfOperation = this.services.getProps() + .get(QueryServices.SOURCE_OPERATION_ATTRIB, null); } private static void checkScn(Long scnParam) throws SQLException { @@ -756,6 +773,8 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix return; } + String connectionQueryServiceName = + this.services.getConfiguration().get(QUERY_SERVICES_NAME); try { isClosing = true; TableMetricsManager.pushMetricsFromConnInstanceMethod(getMutationMetrics()); @@ -781,10 +800,24 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix } finally { isClosing = false; isClosed = true; - if(isInternalConnection()){ + if (isInternalConnection()){ GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement(); + long currentInternalConnectionCount = + this.getQueryServices().getConnectionCount(isInternalConnection()); + ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName, + OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, currentInternalConnectionCount); + ConnectionQueryServicesMetricsManager + .updateConnectionQueryServiceOpenInternalConnectionHistogram( + currentInternalConnectionCount, connectionQueryServiceName); } else { GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement(); + long currentConnectionCount = + this.getQueryServices().getConnectionCount(isInternalConnection()); + ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName, + OPEN_PHOENIX_CONNECTIONS_COUNTER, currentConnectionCount); + ConnectionQueryServicesMetricsManager + .updateConnectionQueryServiceOpenConnectionHistogram( + currentConnectionCount, connectionQueryServiceName); } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java index ad720c6155..ec07deb465 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java @@ -21,6 +21,7 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; import org.apache.phoenix.thirdparty.com.google.common.base.Strings; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -29,6 +30,8 @@ import javax.annotation.concurrent.GuardedBy; import java.sql.SQLException; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER; +import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME; /** * A base class for concrete implementation of ConnectionLimiter. @@ -39,6 +42,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_C public abstract class BaseConnectionLimiter implements ConnectionLimiter { protected int connectionCount = 0; protected int internalConnectionCount = 0; + protected int connectionThrottledCounter = 0; protected String profileName; protected int maxConnectionsAllowed; protected int maxInternalConnectionsAllowed; @@ -65,8 +69,15 @@ public abstract class BaseConnectionLimiter implements ConnectionLimiter { // if throttling threshold is reached, try reclaiming garbage collected phoenix connections. if ((allowedConnections != 0) && (futureConnections > allowedConnections) && (onSweep(connection.isInternalConnection()) == 0)) { GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment(); - - //TODO:- After PHOENIX-7038 per profile Phoenix Throttled Counter should be updated here. + connectionThrottledCounter++; + String connectionQueryServiceName = connection.getQueryServices() + .getConfiguration().get(QUERY_SERVICES_NAME); + // Since this is ever-increasing counter and only gets reset at JVM restart + // Both global and connection query service level, + // we won't create histogram for this metric. + ConnectionQueryServicesMetricsManager.updateMetrics( + connectionQueryServiceName, + PHOENIX_CONNECTIONS_THROTTLED_COUNTER, connectionThrottledCounter); // Let the concrete classes handle the onLimit. // They can either throw the exception back or handle it. diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java index 3bc6d465b2..6e9f98b9b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java @@ -36,4 +36,8 @@ public interface ConnectionLimiter { boolean isLastConnection(); boolean isShouldThrottleNumConnections(); + + int getConnectionCount(); + + int getInternalConnectionCount(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java index 728e734da0..3368bceae9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/AtomicMetric.java @@ -62,6 +62,16 @@ public class AtomicMetric implements Metric { value.set(0); } + /** + * Set the Metric value as current value + * + * @param value + */ + @Override + public void set(long value) { + this.value.set(value); + } + @Override public void decrement() { value.decrementAndGet(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java index 07cd25dfcc..d1ed8ba4fd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetric.java @@ -58,6 +58,14 @@ public interface CombinableMetric extends Metric { @Override public void reset() {} + /** + * Set the Metric value as current value + * + * @param value + */ + @Override + public void set(long value) {} + @Override public String getPublishString() { return EMPTY_STRING; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java index 40cb5166c3..c2a6e7ba8d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/CombinableMetricImpl.java @@ -59,6 +59,16 @@ public class CombinableMetricImpl implements CombinableMetric, Cloneable { metric.reset(); } + /** + * Set the Metric value as current value + * + * @param value + */ + @Override + public void set(long value) { + metric.set(value); + } + @Override public String getPublishString() { return getCurrentMetricState(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java similarity index 54% copy from phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java copy to phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java index 3bc6d465b2..75ba7e0131 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetric.java @@ -15,25 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.phoenix.log; -import org.apache.phoenix.jdbc.PhoenixConnection; - -import java.sql.SQLException; +package org.apache.phoenix.monitoring; /** - * This interface defines the contract for storing information about Phoenix connections - * for debugging client-side issues like - * {@link org.apache.phoenix.exception.SQLExceptionCode#NEW_CONNECTION_THROTTLED} + * Class that exposes the various phoenix metrics collected at the Phoenix Query Service level. + * Because metrics are dynamic in nature, it is not guaranteed that the state exposed will always + * be in sync with each other. One should use these metrics primarily for monitoring and debugging + * purposes. */ -public interface ConnectionLimiter { - - void acquireConnection(PhoenixConnection connection) throws SQLException; - - void returnConnection(PhoenixConnection connection); +public interface ConnectionQueryServicesMetric extends Metric { - int onSweep(boolean internal) ; + /** + * @return Number of samples collected since the last {@link #reset()} call. + */ + long getNumberOfSamples(); - boolean isLastConnection(); + /** + * @return Sum of the values of the metric sampled since the last {@link #reset()} call. + */ + long getTotalSum(); - boolean isShouldThrottleNumConnections(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java similarity index 78% copy from phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java copy to phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java index 91bade1bd7..bbefbf34d8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ConnectionQueryServicesMetricImpl.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,27 +19,40 @@ package org.apache.phoenix.monitoring; import java.util.concurrent.atomic.AtomicLong; -public class PhoenixTableMetricImpl implements PhoenixTableMetric { +/** + * Metric class for Connection Query Services Metric. + */ +public class ConnectionQueryServicesMetricImpl implements ConnectionQueryServicesMetric { private AtomicLong numberOfSamples = new AtomicLong(0); private Metric metric; /** - * Default implementation used when TableLevel Metrics are enabled + * Default implementation used when Phoenix Connection Query Service Metrics are enabled */ - public PhoenixTableMetricImpl(MetricType type) { + public ConnectionQueryServicesMetricImpl(MetricType type) { this.metric = new AtomicMetric(type); } /** - * Reset the internal state. Typically called after metric information has been collected and a new phase of - * collection is being requested for the next interval. + * Reset the internal state. Typically called after metric information has been + * collected and a new phase of collection is being requested for the next interval. */ @Override public void reset() { metric.reset(); numberOfSamples.set(0); } + /** + * Set the Metric value as current value + * + * @param value + */ + @Override + public void set(long value) { + metric.set(value); + } + @Override public long getNumberOfSamples() { return numberOfSamples.get(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java index 8c2128b2c3..ca19c580a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java @@ -33,6 +33,16 @@ public class GlobalMetricImpl implements GlobalMetric { numberOfSamples.set(0); } + /** + * Set the Metric value as current value + * + * @param value + */ + @Override + public void set(long value) { + metric.set(value); + } + @Override public long getNumberOfSamples() { return numberOfSamples.get(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java index 0e51fc02cb..1f3bdb858d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java @@ -60,5 +60,9 @@ public interface Metric { */ public void reset(); + /** + * Set the Metric value as current value + */ + void set(long value); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java index 2dfe941d2f..d03b27e405 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java @@ -64,4 +64,14 @@ public class NoOpGlobalMetricImpl implements GlobalMetric { public void reset() { } + + /** + * Set the Metric value as current value + * + * @param value + */ + @Override + public void set(long value) { + + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java index 4e611c5674..77fe093a40 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java @@ -63,6 +63,16 @@ class NonAtomicMetric implements Metric { value = 0; } + /** + * Set the Metric value as current value + * + * @param value + */ + @Override + public void set(long value) { + this.value = value; + } + @Override public void decrement() { value--; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java index 91bade1bd7..0d6ef7868b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixTableMetricImpl.java @@ -40,6 +40,16 @@ public class PhoenixTableMetricImpl implements PhoenixTableMetric { numberOfSamples.set(0); } + /** + * Set the Metric value as current value + * + * @param value + */ + @Override + public void set(long value) { + metric.set(value); + } + @Override public long getNumberOfSamples() { return numberOfSamples.get(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java new file mode 100644 index 0000000000..883be7b145 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogram.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.monitoring.RangeHistogram; +import org.apache.phoenix.query.QueryServices; + +/** + * Histogram for calculating phoenix connection. We read ranges using + * config property {@link QueryServices#PHOENIX_HISTOGRAM_SIZE_RANGES}. + * If this property is not set then it will default to DEFAULT_RANGE values. + */ +public class ConnectionQueryServicesHistogram extends RangeHistogram { + static final long[] DEFAULT_RANGE = {1, 10, 100, 500, 1000}; + public ConnectionQueryServicesHistogram(String name, String description, Configuration conf) { + super(initializeRanges(conf), name, description); + } + + private static long[] initializeRanges(Configuration conf) { + long[] ranges = PhoenixConfigurationUtil.getLongs( + conf, CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES); + return ranges != null ? ranges : DEFAULT_RANGE; + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java new file mode 100644 index 0000000000..b3a8a1c3c1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetrics.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetricImpl; +import org.apache.phoenix.monitoring.MetricType; + +/** + * Class for Connection Query Service Metrics. + */ +public class ConnectionQueryServicesMetrics { + /** + * List Metrics tracked in Connection Query Service Metrics + */ + public enum QueryServiceMetrics { + CONNECTION_QUERY_SERVICE_OPEN_PHOENIX_CONNECTIONS_COUNTER(OPEN_PHOENIX_CONNECTIONS_COUNTER), + CONNECTION_QUERY_SERVICE_OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER( + OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER), + CONNECTION_QUERY_SERVICE_PHOENIX_CONNECTIONS_THROTTLED_COUNTER( + PHOENIX_CONNECTIONS_THROTTLED_COUNTER); + + private MetricType metricType; + private ConnectionQueryServicesMetric metric; + + QueryServiceMetrics(MetricType metricType) { + this.metricType = metricType; + } + } + + private final String connectionQueryServiceName; + private Map<MetricType, ConnectionQueryServicesMetric> metricRegister; + private ConnectionQueryServicesMetricsHistograms connectionQueryServiceMetricsHistograms; + + public ConnectionQueryServicesMetrics(final String connectionQueryServiceName, + Configuration conf) { + this.connectionQueryServiceName = connectionQueryServiceName; + metricRegister = new HashMap<>(); + for (QueryServiceMetrics connectionQueryServiceMetric + : QueryServiceMetrics.values()) { + connectionQueryServiceMetric.metric = + new ConnectionQueryServicesMetricImpl(connectionQueryServiceMetric.metricType); + metricRegister.put(connectionQueryServiceMetric.metricType, + connectionQueryServiceMetric.metric); + } + connectionQueryServiceMetricsHistograms = + new ConnectionQueryServicesMetricsHistograms(connectionQueryServiceName, conf); + } + + /** + * This function is used to update the value of Metric + * In case of counter val will be passed as 1. + * + * @param type metric type + * @param val update value. In case of counters, this will be 1 + */ + public void setMetricValue(MetricType type, long val) { + if (!metricRegister.containsKey(type)) { + return; + } + ConnectionQueryServicesMetric metric = metricRegister.get(type); + metric.set(val); + } + + /** + * This function is used to get the value of Metric. + * + * @param type metric type + * @return val current value of metric. + */ + public long getMetricValue(MetricType type) { + if (!metricRegister.containsKey(type)) { + return 0; + } + ConnectionQueryServicesMetric metric = metricRegister.get(type); + return metric.getValue(); + } + + public String getConnectionQueryServiceName() { + return connectionQueryServiceName; + } + + /** + * This method is called to aggregate all the Metrics across all Connection Query Service. + * + * @return map of Connection Query Service name -> list of ConnectionQueryServicesMetric. + */ + public List<ConnectionQueryServicesMetric> getAllMetrics() { + return new ArrayList<>(metricRegister.values()); + } + + public ConnectionQueryServicesMetricsHistograms getConnectionQueryServiceHistograms() { + return connectionQueryServiceMetricsHistograms; + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java new file mode 100644 index 0000000000..29e8a4b4f4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistograms.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import edu.umd.cs.findbugs.annotations.SuppressWarnings; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.monitoring.HistogramDistribution; + +/** + * Histogram Metrics for Connection Query Service Metrics. + * 1. Connection count + * 2. Internal Connection Count. + */ +public class ConnectionQueryServicesMetricsHistograms { + private String connectionQueryServicesName; + private ConnectionQueryServicesHistogram connectionQueryServiceOpenInternalSizeHistogram; + private ConnectionQueryServicesHistogram connectionQueryServicesOpenConnSizeHistogram; + + public ConnectionQueryServicesMetricsHistograms(String connectionQueryServiceName, + Configuration conf) { + connectionQueryServicesName = connectionQueryServiceName; + connectionQueryServiceOpenInternalSizeHistogram = new ConnectionQueryServicesHistogram( + "PhoenixInternalOpenConn", + "histogram for number of open internal phoenix connections", conf); + connectionQueryServicesOpenConnSizeHistogram = new ConnectionQueryServicesHistogram( + "PhoenixOpenConn", "histogram for number of open phoenix connections", conf); + } + + public String getConnectionQueryServicesName() { + return this.connectionQueryServicesName; + } + + @SuppressWarnings(value = "EI_EXPOSE_REP", + justification = "It's only used in internally for metrics storage") + public ConnectionQueryServicesHistogram getConnectionQueryServicesInternalOpenConnHisto() { + return connectionQueryServiceOpenInternalSizeHistogram; + } + + @SuppressWarnings(value = "EI_EXPOSE_REP", + justification = "It's only used in internally for metrics storage") + public ConnectionQueryServicesHistogram getConnectionQueryServicesOpenConnHisto() { + return connectionQueryServicesOpenConnSizeHistogram; + } + + public List<HistogramDistribution> getConnectionQueryServicesHistogramsDistribution() { + List<HistogramDistribution> list = new ArrayList(Arrays.asList( + this.connectionQueryServiceOpenInternalSizeHistogram.getRangeHistogramDistribution(), + this.connectionQueryServicesOpenConnSizeHistogram.getRangeHistogramDistribution())); + return Collections.unmodifiableList(list); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java new file mode 100644 index 0000000000..0175924653 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManager.java @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import edu.umd.cs.findbugs.annotations.SuppressWarnings; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; +import org.apache.phoenix.monitoring.HistogramDistribution; +import org.apache.phoenix.monitoring.MetricPublisherSupplierFactory; +import org.apache.phoenix.monitoring.MetricServiceResolver; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.thirdparty.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Central place where we keep track of all the Connection Query Service metrics. Register each + * Connection Query Service and store the instance of it associated with ConnectionServiceName in a + * map This class exposes following functions as static functions to help catch all exception + * 1.clearAllConnectionQueryServiceMetrics + * 2.getConnectionQueryServicesMetrics + * 3.updateMetrics + */ +public class ConnectionQueryServicesMetricsManager { + private static final Logger LOGGER = + LoggerFactory.getLogger(ConnectionQueryServicesMetricsManager.class); + private static volatile boolean isConnectionQueryServiceMetricsEnabled; + private static volatile boolean isConnectionQueryServiceMetricPublisherEnabled; + private static ConcurrentMap<String, ConnectionQueryServicesMetrics> + connectionQueryServiceMetricsMapping; + // Singleton object + private static volatile ConnectionQueryServicesMetricsManager + connectionQueryServicesMetricsManager = null; + private static volatile MetricPublisherSupplierFactory mPublisher = null; + private static volatile QueryServicesOptions options; + + @SuppressWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", justification = "This " + + "Object is only created once for the JVM") + public ConnectionQueryServicesMetricsManager(QueryServicesOptions opts) { + options = opts; + connectionQueryServiceMetricsMapping = new ConcurrentHashMap<>(); + isConnectionQueryServiceMetricsEnabled = options.isConnectionQueryServiceMetricsEnabled(); + isConnectionQueryServiceMetricPublisherEnabled = + options.isConnectionQueryServiceMetricsPublisherEnabled(); + LOGGER.info("Connection query service metrics enabled : " + + isConnectionQueryServiceMetricsEnabled + " publisher enabled : " + + isConnectionQueryServiceMetricPublisherEnabled); + } + + @SuppressWarnings(value = "EI_EXPOSE_STATIC_REP2", justification = "Only used for testing") + public static void setInstance(ConnectionQueryServicesMetricsManager metricsManager) { + connectionQueryServicesMetricsManager = metricsManager; + } + + /** + * Function to provide instance of ConnectionQueryServiceMetricsManager(Create if needed in + * thread safe manner) + * @return returns instance of ConnectionQueryServicesMetricsManager + */ + @SuppressWarnings(value = "MS_EXPOSE_REP", justification = "Only used internally, not exposed" + + " to external client") + public static ConnectionQueryServicesMetricsManager getInstance() { + if (connectionQueryServicesMetricsManager == null) { + synchronized (ConnectionQueryServicesMetricsManager.class) { + if (connectionQueryServicesMetricsManager == null) { + QueryServicesOptions options = QueryServicesOptions.withDefaults(); + if (options.isConnectionQueryServiceMetricsEnabled()) { + connectionQueryServicesMetricsManager = + new ConnectionQueryServicesMetricsManager(options); + LOGGER.info("Created object for Connection query service metrics manager"); + } else { + connectionQueryServicesMetricsManager = + NoOpConnectionQueryServicesMetricsManager.NO_OP_CONN_QUERY_SERVICES_METRICS_MANAGER; + LOGGER.info("Created object for NoOp Connection query service metrics manager"); + return connectionQueryServicesMetricsManager; + } + registerMetricsPublisher(); + } + } + } + return connectionQueryServicesMetricsManager; + } + + ConnectionQueryServicesMetricsManager() { + + } + + public static void registerMetricsPublisher() { + if (isConnectionQueryServiceMetricPublisherEnabled) { + String className = options.getConnectionQueryServiceMetricsPublisherClass(); + if (className != null) { + MetricServiceResolver mResolver = new MetricServiceResolver(); + LOGGER.info("Connection query service metrics publisher className " + + className); + try { + mPublisher = mResolver.instantiate(className); + mPublisher.registerMetricProvider(); + } catch (Throwable e) { + LOGGER.error("The exception from metric publish Function", e); + } + + } else { + LOGGER.warn("Connection query service metrics publisher className" + + " can't be null"); + } + } + } + + /** + * Function to provide Object of ConnectionQueryServicesMetrics (Create if needed in + * thread safe manner) for connectionQueryServiceName + * @param connectionQueryServiceName Connection Query Service Name + * @return returns instance of ConnectionQueryServicesMetrics for connectionQueryServiceName + */ + ConnectionQueryServicesMetrics getConnectionQueryServiceMetricsInstance( + String connectionQueryServiceName) { + if (Strings.isNullOrEmpty(connectionQueryServiceName)) { + LOGGER.warn("Connection query service Name can't be null or empty"); + return null; + } + + ConnectionQueryServicesMetrics cqsInstance = + connectionQueryServiceMetricsMapping.get(connectionQueryServiceName); + if (cqsInstance == null) { + synchronized (ConnectionQueryServicesMetricsManager.class) { + cqsInstance = connectionQueryServiceMetricsMapping.get(connectionQueryServiceName); + if (cqsInstance == null) { + + LOGGER.info("Creating connection query service metrics object for : " + + connectionQueryServiceName); + cqsInstance = new ConnectionQueryServicesMetrics(connectionQueryServiceName, + options.getConfiguration()); + connectionQueryServiceMetricsMapping + .put(connectionQueryServiceName, cqsInstance); + } + } + } + return cqsInstance; + } + + /** + * This function will be used to add individual MetricType to LocalStore. Also this will serve + * as LocalStore to store connection query service metrics before their current value is added + * to histogram. + * This func is only used for metrics which are counter based, where values increases or + * decreases frequently. Like Open Conn Counter. This function will first retrieve it's current + * value and increment or decrement (by +/-1) it as required then update the new values. + * <br> + * Example :- OPEN_PHOENIX_CONNECTIONS_COUNTER, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER + * <br> + * <br> + * histogram will update with each increment/decrement. + * @param connectionQueryServiceName + * @param type + * @param value + */ + void updateMetricsValue(String connectionQueryServiceName, MetricType type, + long value) { + + long startTime = EnvironmentEdgeManager.currentTime(); + + ConnectionQueryServicesMetrics cqsInstance = + getConnectionQueryServiceMetricsInstance(connectionQueryServiceName); + if (cqsInstance == null) { + return; + } + cqsInstance.setMetricValue(type, value); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Connection query service metrics completed updating metric " + + type + " to value " + value + ", timetaken = " + + (EnvironmentEdgeManager.currentTime() - startTime)); + } + } + + /** + * static functions to push, update or retrieve ConnectionQueryService Metrics. + * @param connectionQueryServiceName name of the connection query service + * @param type type of metric + * @param value metric value + */ + public static void updateMetrics(String connectionQueryServiceName, MetricType type, + long value) { + try { + ConnectionQueryServicesMetricsManager.getInstance() + .updateMetricsValue(connectionQueryServiceName, type, value); + } catch (Exception e) { + LOGGER.error("Failed updating connection query service metrics", e); + } + } + + public static Map<String, List<ConnectionQueryServicesMetric>> getAllConnectionQueryServicesMetrics() { + return ConnectionQueryServicesMetricsManager.getInstance() + .getConnectionQueryServicesMetrics(); + } + + /** + * This function will return all the counters for Phoenix connection query service. + * @return Map of all ConnectionQueryService Metrics. + */ + Map<String, List<ConnectionQueryServicesMetric>> getConnectionQueryServicesMetrics() { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + Map<String, List<ConnectionQueryServicesMetric>> map = new HashMap<>(); + for (Map.Entry<String, ConnectionQueryServicesMetrics> entry + : connectionQueryServiceMetricsMapping.entrySet()) { + map.put(entry.getKey(), entry.getValue().getAllMetrics()); + } + long timeTakenForMetricConversion = EnvironmentEdgeManager.currentTime() - startTime; + LOGGER.info("Connection query service metrics fetching complete, timeTaken: " + + timeTakenForMetricConversion); + return map; + } catch (Exception e) { + LOGGER.error("Failed retrieving connection query service Metrics", e); + } + return null; + } + + public static Map<String, List<HistogramDistribution>> getHistogramsForAllConnectionQueryServices() { + return ConnectionQueryServicesMetricsManager.getInstance() + .getHistogramsForConnectionQueryServices(); + } + + /** + * This function will return histogram for all the Phoenix connection query service metrics. + * @return Map of all ConnectionServiceMetrics Histogram + */ + Map<String, List<HistogramDistribution>> getHistogramsForConnectionQueryServices() { + Map<String, List<HistogramDistribution>> map = new HashMap<>(); + for (Map.Entry<String, ConnectionQueryServicesMetrics> entry + : connectionQueryServiceMetricsMapping.entrySet()) { + ConnectionQueryServicesMetricsHistograms connectionQueryServiceHistogramsHistograms = + entry.getValue().getConnectionQueryServiceHistograms(); + map.put(entry.getKey(), connectionQueryServiceHistogramsHistograms + .getConnectionQueryServicesHistogramsDistribution()); + } + return map; + } + + /** + * Function to update {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER} counter value in + * Histogram + * @param connCount current count of + * {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER} + * @param connectionQueryServiceName ConnectionQueryService name + */ + public static void updateConnectionQueryServiceOpenConnectionHistogram(long connCount, + String connectionQueryServiceName) { + ConnectionQueryServicesMetrics metrics = + getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName); + if (metrics == null) { + return; + } + metrics.getConnectionQueryServiceHistograms().getConnectionQueryServicesOpenConnHisto() + .add(connCount); + } + + /** + * Function to update {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER} counter value + * in Histogram + * @param connCount current count of + * {@link + * MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER} + * @param connectionQueryServiceName ConnectionQueryService name + */ + public static void updateConnectionQueryServiceOpenInternalConnectionHistogram(long connCount, + String connectionQueryServiceName) { + ConnectionQueryServicesMetrics metrics = + getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName); + if (metrics == null) { + return; + } + metrics.getConnectionQueryServiceHistograms() + .getConnectionQueryServicesInternalOpenConnHisto().add(connCount); + } + + ///////////////////////////////////////////////////////// + ////// Below Functions are majorly used in testing ////// + ///////////////////////////////////////////////////////// + + public static ConnectionQueryServicesHistogram getConnectionQueryServiceOpenInternalConnectionHistogram( + String connectionQueryServiceName) { + ConnectionQueryServicesMetrics metrics = + getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName); + if (metrics == null) { + return null; + } + return metrics.getConnectionQueryServiceHistograms() + .getConnectionQueryServicesInternalOpenConnHisto(); + } + + public static ConnectionQueryServicesHistogram + getConnectionQueryServiceOpenConnectionHistogram(String connectionQueryServiceName) { + ConnectionQueryServicesMetrics metrics = + getInstance().getConnectionQueryServiceMetricsInstance(connectionQueryServiceName); + if (metrics == null) { + return null; + } + return metrics.getConnectionQueryServiceHistograms() + .getConnectionQueryServicesOpenConnHisto(); + } + + /** + * Helps reset the localstore(connectionQueryServiceMetricsMapping) + */ + void clearConnectionQueryServiceMetrics() { + if (connectionQueryServiceMetricsMapping != null) { + connectionQueryServiceMetricsMapping.clear(); + } + LOGGER.info("Connection query service metrics clearing complete"); + } + + public static void clearAllConnectionQueryServiceMetrics() { + try { + ConnectionQueryServicesMetricsManager.getInstance() + .clearConnectionQueryServiceMetrics(); + } catch (Exception e) { + LOGGER.error("Failed resetting connection query service Metrics", e); + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java new file mode 100644 index 0000000000..cf3ec00de3 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/connectionqueryservice/NoOpConnectionQueryServicesMetricsManager.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; +import org.apache.phoenix.monitoring.HistogramDistribution; +import org.apache.phoenix.monitoring.MetricType; + +/** + * ConnectionQueryServicesMetricsManager will be replaced by this class when + * {@link org.apache.phoenix.query.QueryServices#CONNECTION_QUERY_SERVICE_METRICS_ENABLED} flag is + * set to false. + */ +public class NoOpConnectionQueryServicesMetricsManager extends ConnectionQueryServicesMetricsManager { + + public static final NoOpConnectionQueryServicesMetricsManager NO_OP_CONN_QUERY_SERVICES_METRICS_MANAGER = + new NoOpConnectionQueryServicesMetricsManager(); + + private NoOpConnectionQueryServicesMetricsManager() { + super(); + } + + void updateMetricsValue(String connectionQueryServiceName, MetricType type, + long value) { + } + + Map<String, List<ConnectionQueryServicesMetric>> getConnectionQueryServicesMetrics() { + return Collections.emptyMap(); + } + + Map<String, List<HistogramDistribution>> getHistogramsForConnectionQueryServices() { + return Collections.emptyMap(); + } + + void clearConnectionQueryServiceMetrics() { + + } + + ConnectionQueryServicesMetrics getConnectionQueryServiceMetricsInstance( + String connectionQueryServiceName) { + return null; + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index b89ad3e435..b26c6cf75e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -226,4 +226,5 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated throw new UnsupportedOperationException(); } + int getConnectionCount(boolean isInternal); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index dcc29f980e..ad4d2a48e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -306,7 +306,6 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.StringUtil; -import org.apache.phoenix.util.TimeKeeper; import org.apache.phoenix.util.UpgradeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -455,6 +454,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement for (Entry<String,String> entry : connectionInfo.asProps()) { config.set(entry.getKey(), entry.getValue()); } + if (connectionInfo.getPrincipal() != null) { + config.set(QUERY_SERVICES_NAME, connectionInfo.getPrincipal()); + } + LOGGER.info(String.format("CQS initialized with connection query service : %s", + config.get(QUERY_SERVICES_NAME))); this.connectionInfo = connectionInfo; // Without making a copy of the configuration we cons up, we lose some of our properties @@ -789,6 +793,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return latestMetaData; } + @Override + public int getConnectionCount(boolean isInternal) { + if (isInternal) { + return connectionLimiter.getInternalConnectionCount(); + } else { + return connectionLimiter.getConnectionCount(); + } + } + @Override public void addTable(PTable table, long resolvedTime) throws SQLException { synchronized (latestMetaDataLock) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 7da182857d..87f0bd63fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -809,4 +809,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public PMetaData getMetaDataCache() { return metaData; } + + @Override + public int getConnectionCount(boolean isInternal) { + return 0; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 72ee10c215..46edad7ce9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -416,4 +416,9 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public ConnectionLimiter getConnectionLimiter() { return getDelegate().getConnectionLimiter(); } + + @Override + public int getConnectionCount(boolean isInternal) { + return getDelegate().getConnectionCount(isInternal); + } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index d300929ac7..af2c518a1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -56,6 +56,7 @@ public interface QueryServices extends SQLCloseable { "phoenix.query.server.orderBy.spooling.enabled"; public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab"; public static final String HBASE_CLIENT_PRINCIPAL = "hbase.myclient.principal"; + String QUERY_SERVICES_NAME = "phoenix.query.services.name"; public static final String SPOOL_DIRECTORY = "phoenix.spool.directory"; public static final String AUTO_COMMIT_ATTRIB = "phoenix.connection.autoCommit"; // consistency configuration setting @@ -392,6 +393,17 @@ public interface QueryServices extends SQLCloseable { public static final String PHOENIX_HISTOGRAM_LATENCY_RANGES = "phoenix.histogram.latency.ranges"; // The range of bins for size metrics for histogram. public static final String PHOENIX_HISTOGRAM_SIZE_RANGES = "phoenix.histogram.size.ranges"; + + // Connection Query Service Metrics Configs + String CONNECTION_QUERY_SERVICE_METRICS_ENABLED = "phoenix.conn.query.service.metrics.enabled"; + String CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME = + "phoenix.monitoring.connection.query.service.metricProvider.className"; + String CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED = + "phoenix.conn.query.service.metricsPublisher.enabled"; + // The range of bins for Connection Query Service Metrics of histogram. + String CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES = + "phoenix.conn.query.service.histogram.size.ranges"; + // This config is used to move (copy and delete) the child links from the SYSTEM.CATALOG to SYSTEM.CHILD_LINK table. // As opposed to a copy and async (out of band) delete. public static final String MOVE_CHILD_LINKS_DURING_UPGRADE_ENABLED = "phoenix.move.child_link.during.upgrade"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 81a2a29004..b39fb788a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -30,6 +30,10 @@ import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRI import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED; import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_INTERVAL; +import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES; +import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED; +import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED; +import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME; import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB; @@ -71,6 +75,7 @@ import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_AT import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED; +import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME; import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED; @@ -322,6 +327,14 @@ public class QueryServicesOptions { public static final long DEFAULT_INDEX_POPULATION_SLEEP_TIME = 5000; + // Phoenix Connection Query Service configuration Defaults + public static final String DEFAULT_QUERY_SERVICES_NAME = "DEFAULT_CQSN"; + public static final String DEFAULT_CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES = + "1, 10, 100, 500, 1000"; + public static final boolean DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED = + false; + public static final boolean DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED = false; + public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true; public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS = DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2; @@ -495,7 +508,14 @@ public class QueryServicesOptions { .setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG) .setIfUnset(CLIENT_INDEX_ASYNC_THRESHOLD, DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD) .setIfUnset(PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED, DEFAULT_SERVER_SIDE_MASKING_ENABLED) + .setIfUnset(QUERY_SERVICES_NAME, DEFAULT_QUERY_SERVICES_NAME) .setIfUnset(INDEX_CREATE_DEFAULT_STATE, DEFAULT_CREATE_INDEX_STATE) + .setIfUnset(CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES, + DEFAULT_CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES) + .setIfUnset(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, + DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED) + .setIfUnset(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED, + DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED) .setIfUnset(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, DEFAULT_SKIP_SYSTEM_TABLES_EXISTENCE_CHECK) .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE) @@ -754,6 +774,29 @@ public class QueryServicesOptions { return config.getBoolean(METRIC_PUBLISHER_ENABLED, DEFAULT_IS_METRIC_PUBLISHER_ENABLED); } + public boolean isConnectionQueryServiceMetricsEnabled() { + return config.getBoolean(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, + DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_ENABLED); + } + + public boolean isConnectionQueryServiceMetricsPublisherEnabled() { + return config.getBoolean(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED, + DEFAULT_IS_CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED); + } + + public String getQueryServicesName() { + return config.get(QUERY_SERVICES_NAME, DEFAULT_QUERY_SERVICES_NAME); + } + + public void setConnectionQueryServiceMetricsEnabled() { + set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, true); + } + + public String getConnectionQueryServiceMetricsPublisherClass() { + return config.get(CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME, + DEFAULT_METRIC_PUBLISHER_CLASS_NAME); + } + @VisibleForTesting public void setAllowedListForTableLevelMetrics(String tableNameList){ set(ALLOWED_LIST_FOR_TABLE_LEVEL_METRICS,tableNameList); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index d4448a786c..f92a92c02b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -67,12 +67,14 @@ import org.apache.phoenix.jdbc.PhoenixMonitoredConnection; import org.apache.phoenix.jdbc.PhoenixMonitoredResultSet; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.monitoring.GlobalMetric; import org.apache.phoenix.monitoring.HistogramDistribution; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.PhoenixTableMetric; import org.apache.phoenix.monitoring.TableMetricsManager; +import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.AmbiguousColumnException; @@ -1465,6 +1467,22 @@ public class PhoenixRuntime { return TableMetricsManager.getSizeHistogramsForAllTables(); } + public static Map<String, List<HistogramDistribution>> getAllConnectionQueryServicesHistograms() { + return ConnectionQueryServicesMetricsManager.getHistogramsForAllConnectionQueryServices(); + } + + public static Map<String, List<ConnectionQueryServicesMetric>> getAllConnectionQueryServicesCounters() { + return ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics(); + } + + /** + * This is only used in testcases to reset the connection query services Metrics data + */ + @VisibleForTesting + public static void clearAllConnectionQueryServiceMetrics() { + ConnectionQueryServicesMetricsManager.clearAllConnectionQueryServiceMetrics(); + } + /** * This is only used in testcases to reset the tableLevel Metrics data */ diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java new file mode 100644 index 0000000000..5b3b107df7 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesHistogramTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.query.QueryServices; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class ConnectionQueryServicesHistogramTest { + + @Test + public void testConnectionQueryServiceHistogramRangeOverride() { + String histoName = "PhoenixInternalOpenConn"; + Configuration conf = new Configuration(); + conf.set(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES, "2, 5, 8"); + ConnectionQueryServicesHistogram histogram = new ConnectionQueryServicesHistogram(histoName, + "histogram for Number of open internal phoenix connections", conf); + Assert.assertEquals(histoName, histogram.getName()); + long[] ranges = histogram.getRanges(); + Assert.assertNotNull(ranges); + long[] expectRanges = {2,5,8}; + Assert.assertArrayEquals(expectRanges, ranges); + } + + @Test + public void testEveryRangeInDefaultRange() { + //1, 3, 7, 9, 15, 30, 120, 600 + Configuration conf = new Configuration(); + String histoName = "PhoenixInternalOpenConn"; + conf.unset(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES); + ConnectionQueryServicesHistogram histogram = new ConnectionQueryServicesHistogram(histoName, + "histogram for Number of open internal phoenix connections", conf); + Assert.assertEquals(histoName, histogram.getName()); + Assert.assertEquals(ConnectionQueryServicesHistogram.DEFAULT_RANGE, histogram.getRanges()); + + histogram.add(1); + histogram.add(3); + histogram.add(7); + histogram.add(9); + histogram.add(15); + histogram.add(30); + histogram.add(120); + histogram.add(600); + + Map<String, Long> distribution = histogram.getRangeHistogramDistribution().getRangeDistributionMap(); + Map<String, Long> expectedMap = new HashMap<>(); + expectedMap.put("0,1", 1l); + expectedMap.put("1,10", 3l); + expectedMap.put("10,100", 2l); + expectedMap.put("100,500", 1l); + expectedMap.put("500,1000", 1l); + Assert.assertEquals(expectedMap, distribution); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java new file mode 100644 index 0000000000..f9fa36375d --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsHistogramsTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +public class ConnectionQueryServicesMetricsHistogramsTest { + @Test + public void testConnectionQueryServiceMetricsHistograms() { + String connectionQueryServiceName = "USE_CASE_1"; + Configuration conf = new Configuration(); + ConnectionQueryServicesMetricsHistograms + connectionQueryServiceMetricsHistograms = new ConnectionQueryServicesMetricsHistograms(connectionQueryServiceName, conf); + Assert.assertEquals(connectionQueryServiceName, connectionQueryServiceMetricsHistograms.getConnectionQueryServicesName()); + Assert.assertNotNull(connectionQueryServiceMetricsHistograms.getConnectionQueryServicesOpenConnHisto()); + Assert.assertNotNull(connectionQueryServiceMetricsHistograms.getConnectionQueryServicesInternalOpenConnHisto()); + + Assert.assertEquals(2, connectionQueryServiceMetricsHistograms.getConnectionQueryServicesHistogramsDistribution().size()); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java new file mode 100644 index 0000000000..b2073433c4 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsManagerTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; +import java.util.Map; + +import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER; +import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.connectionQueryServiceNames; +import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openInternalPhoenixConnCounter; +import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openPhoenixConnCounter; +import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.phoenixConnThrottledCounter; +import static org.junit.Assert.assertTrue; + +public class ConnectionQueryServicesMetricsManagerTest { + public boolean verifyMetricsReset(){ + Map<String, List<ConnectionQueryServicesMetric>> map = + ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics(); + return map != null && map.isEmpty(); + } + + public boolean verifyConnectionQueryServiceNamesExists(String connectionQueryServiceName){ + Map<String,List<ConnectionQueryServicesMetric>>map = + ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics(); + return map != null && map.containsKey(connectionQueryServiceName); + } + + @Test + public void testConnectionQueryServiceMetricsForUpdateMetricsMethod() { + + QueryServicesOptions options = QueryServicesOptions.withDefaults(); + options.setConnectionQueryServiceMetricsEnabled(); + ConnectionQueryServicesMetricsManager connectionQueryServicesMetricsManager = + new ConnectionQueryServicesMetricsManager(options); + ConnectionQueryServicesMetricsManager.setInstance(connectionQueryServicesMetricsManager); + + ConnectionQueryServicesNameMetricsTest + testData = new ConnectionQueryServicesNameMetricsTest(); + testData.populateMetrics(); + for(int i = 0; i < connectionQueryServiceNames.length; i++) { + ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i], + OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]); + ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i], + OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, openInternalPhoenixConnCounter[i]); + ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceNames[i], + PHOENIX_CONNECTIONS_THROTTLED_COUNTER, phoenixConnThrottledCounter[i]); + } + testData.verfiyCountOfConnectionQueryServices(connectionQueryServiceNames.length); + ConnectionQueryServicesMetricsManager.clearAllConnectionQueryServiceMetrics(); + assertTrue(verifyMetricsReset()); + } + + @Test + public void testHistogramMetricsForOpenPhoenixConnectionCounter() { + String connectionQueryServiceName = "USE_CASE_1"; + Configuration conf = new Configuration(); + conf.set(QueryServices.CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES, "3, 6, 9"); + + QueryServicesOptions mockOptions = Mockito.mock(QueryServicesOptions.class); + Mockito.doReturn(true).when(mockOptions) + .isConnectionQueryServiceMetricsEnabled(); + Mockito.doReturn(conf).when(mockOptions).getConfiguration(); + ConnectionQueryServicesMetricsManager connectionQueryServicesMetricsManager = + new ConnectionQueryServicesMetricsManager(mockOptions); + ConnectionQueryServicesMetricsManager.setInstance(connectionQueryServicesMetricsManager); + for (int i=0; i<9; i++) { + updateMetricsAndHistogram(i+1, connectionQueryServiceName); + } + + + // Generate distribution map from histogram snapshots. + ConnectionQueryServicesHistogram connectionQueryServicesHistogram = + ConnectionQueryServicesMetricsManager.getConnectionQueryServiceOpenConnectionHistogram(connectionQueryServiceName); + + Map<String, Long> openPhoenixConnMap = connectionQueryServicesHistogram.getRangeHistogramDistribution().getRangeDistributionMap(); + for (Long count: openPhoenixConnMap.values()) { + Assert.assertEquals(new Long(3), count); + } + } + + private void updateMetricsAndHistogram (long counter, String connectionQueryServiceName) { + ConnectionQueryServicesMetricsManager.updateMetrics(connectionQueryServiceName, + OPEN_PHOENIX_CONNECTIONS_COUNTER, counter); + ConnectionQueryServicesMetricsManager.updateConnectionQueryServiceOpenConnectionHistogram(counter, + connectionQueryServiceName); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java new file mode 100644 index 0000000000..d05a6824b3 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; +import org.apache.phoenix.monitoring.MetricType; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openInternalPhoenixConnCounter; +import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.openPhoenixConnCounter; +import static org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesNameMetricsTest.connectionQueryServiceNames; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ConnectionQueryServicesMetricsTest { + static Map<String, ConnectionQueryServicesMetrics> phoenixConnectionQueryServiceSet = + new HashMap<>(); + + public boolean verifyConnectionQueryServiceName() { + + if (phoenixConnectionQueryServiceSet.isEmpty()) { + return false; + } + for (String connectionQueryServiceName : connectionQueryServiceNames) { + ConnectionQueryServicesMetrics instance = + phoenixConnectionQueryServiceSet.get(connectionQueryServiceName); + if (!instance.getConnectionQueryServiceName().equals(connectionQueryServiceName)) { + return false; + } + } + return true; + } + + public void verifyMetricsFromPhoenixConnectionQueryServiceMetrics() { + assertFalse(phoenixConnectionQueryServiceSet.isEmpty()); + for (int i = 0; i < connectionQueryServiceNames.length; i++) { + ConnectionQueryServicesMetrics instance = + phoenixConnectionQueryServiceSet.get(connectionQueryServiceNames[i]); + assertEquals(instance.getConnectionQueryServiceName(), connectionQueryServiceNames[i]); + List<ConnectionQueryServicesMetric> metricList = instance.getAllMetrics(); + for (ConnectionQueryServicesMetric metric : metricList) { + + if (metric.getMetricType() + .equals(MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER)) { + assertEquals(openInternalPhoenixConnCounter[i], metric.getValue()); + } + if (metric.getMetricType().equals(MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER)) { + assertEquals(openPhoenixConnCounter[i], metric.getValue()); + } + } + } + } + + @Test + public void testPhoenixConnectionQueryServiceMetricsForPhoenixConnectionQueryServiceName() { + Configuration conf = new Configuration(); + for (int i = 0; i < connectionQueryServiceNames.length; i++) { + ConnectionQueryServicesMetrics instance = + new ConnectionQueryServicesMetrics(connectionQueryServiceNames[i], conf); + phoenixConnectionQueryServiceSet.put(connectionQueryServiceNames[i], instance); + } + assertTrue(verifyConnectionQueryServiceName()); + } + + /** + * This test is for changeMetricValue() Method and getMetricMap() + */ + @Test + public void testPhoenixConnectionQueryServiceMetrics() { + Configuration conf = new Configuration(); + for (int i = 0; i < connectionQueryServiceNames.length; i++) { + ConnectionQueryServicesMetrics instance = + new ConnectionQueryServicesMetrics(connectionQueryServiceNames[i], conf); + phoenixConnectionQueryServiceSet.put(connectionQueryServiceNames[i], instance); + + instance.setMetricValue(MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, + openInternalPhoenixConnCounter[i]); + instance.setMetricValue( + MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]); + } + verifyMetricsFromPhoenixConnectionQueryServiceMetrics(); + phoenixConnectionQueryServiceSet.clear(); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java new file mode 100644 index 0000000000..6a1d26e05f --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesNameMetricsTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring.connectionqueryservice; + +import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; +import org.apache.phoenix.monitoring.MetricType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This class is used primarily to populate data and + * verification methods + */ + +public class ConnectionQueryServicesNameMetricsTest { + + public static final String[] connectionQueryServiceNames = + { "USE_CASE_1", "USE_CASE_2", "USE_CASE_3" }; + public static Map<String, Map<MetricType, Long>>[] connectionQueryServiceNameMetricMap = + new Map[connectionQueryServiceNames.length]; + public static final long[] openPhoenixConnCounter = { 1, 1, 1 }; + public static final long[] openInternalPhoenixConnCounter = { 1, 1, 1 }; + public static final long[] phoenixConnThrottledCounter = { 1, 2, 3 }; + + + public void populateMetrics() { + for (int i = 0; i < connectionQueryServiceNameMetricMap.length; i++) { + connectionQueryServiceNameMetricMap[i] = new HashMap<>(); + } + for (int i = 0; i < connectionQueryServiceNames.length; i++) { + Map<MetricType, Long> metrics = new HashMap<>(); + metrics.put(OPEN_PHOENIX_CONNECTIONS_COUNTER, openPhoenixConnCounter[i]); + metrics.put( + OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, openInternalPhoenixConnCounter[i]); + metrics.put(PHOENIX_CONNECTIONS_THROTTLED_COUNTER, phoenixConnThrottledCounter[i]); + + connectionQueryServiceNameMetricMap[i].put(connectionQueryServiceNames[i], metrics); + } + } + + public void verfiyCountOfConnectionQueryServices(int noOfConnectionQueryServiceName) { + Map<String, List<ConnectionQueryServicesMetric>> map = + ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics(); + assertFalse(map == null || map.isEmpty()); + for (int i = 0; i < noOfConnectionQueryServiceName; i++) { + assertTrue(map.containsKey(connectionQueryServiceNames[i])); + List<ConnectionQueryServicesMetric> connectionQueryServiceNameMetric = + map.get(connectionQueryServiceNames[i]); + for (ConnectionQueryServicesMetric metric : connectionQueryServiceNameMetric) { + if (metric.getMetricType().equals(OPEN_PHOENIX_CONNECTIONS_COUNTER)) { + assertEquals(openPhoenixConnCounter[i], metric.getValue()); + } + if (metric.getMetricType().equals(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER)) { + assertEquals(openInternalPhoenixConnCounter[i], metric.getValue()); + } + if (metric.getMetricType().equals(PHOENIX_CONNECTIONS_THROTTLED_COUNTER)) { + assertEquals(phoenixConnThrottledCounter[i], metric.getValue()); + } + } + } + } + +}