This is an automated email from the ASF dual-hosted git repository. jisaac pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new f40921ce33 PHOENIX-7063 Track and account garbage collected phoenix connections (#1706) f40921ce33 is described below commit f40921ce33b51a76d941c0231a6c1acdac2edac4 Author: Lokesh Khurana <khuranalokes...@gmail.com> AuthorDate: Thu Oct 12 18:24:59 2023 -0700 PHOENIX-7063 Track and account garbage collected phoenix connections (#1706) Track and account garbage collected phoenix connections --------- Co-authored-by: Jacob Isaac <jis...@salesforce.com> Co-authored-by: Jacob Isaac <jacobpisaa...@gmail.com> Co-authored-by: Lokesh Khurana <lokesh.khur...@salesforce.com> --- .../phoenix/jdbc/LoggingConnectionLimiterIT.java | 349 +++++++++++++++++++++ .../phoenix/jdbc/LoggingHAConnectionLimiterIT.java | 155 +++++++++ .../jdbc/LoggingSingleConnectionLimiterIT.java | 135 ++++++++ .../org/apache/phoenix/jdbc/PhoenixConnection.java | 30 +- .../phoenix/jdbc/PhoenixMonitoredConnection.java | 2 +- .../org/apache/phoenix/jdbc/PhoenixStatement.java | 32 +- .../org/apache/phoenix/log/ActivityLogInfo.java | 62 ++++ .../apache/phoenix/log/BaseConnectionLimiter.java | 136 ++++++++ .../phoenix/log/ConnectionActivityLogger.java | 150 +++++++++ .../org/apache/phoenix/log/ConnectionLimiter.java | 39 +++ .../phoenix/log/DefaultConnectionLimiter.java | 64 ++++ .../phoenix/log/LoggingConnectionLimiter.java | 181 +++++++++++ .../phoenix/query/ConnectionQueryServices.java | 5 + .../phoenix/query/ConnectionQueryServicesImpl.java | 77 +++-- .../query/DelegateConnectionQueryServices.java | 5 + .../org/apache/phoenix/query/QueryServices.java | 4 + .../apache/phoenix/query/QueryServicesOptions.java | 10 +- 17 files changed, 1399 insertions(+), 37 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingConnectionLimiterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingConnectionLimiterIT.java new file mode 100644 index 0000000000..3a313f4973 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingConnectionLimiterIT.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.commons.lang3.StringUtils; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.log.ConnectionLimiter; +import org.apache.phoenix.log.LoggingConnectionLimiter; +import org.apache.phoenix.query.BaseTest; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.PreparedStatement; +import java.sql.Timestamp; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.Random; + + +import java.time.Instant; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +@Category(NeedsOwnMiniClusterTest.class) +public abstract class LoggingConnectionLimiterIT extends BaseTest { + private static enum ActivityType { + QUERY, LOAD + } + //private static final Logger LOG = LoggerFactory.getLogger(LoggingConnectionLimiterIT.class); + private static final Instant NOW = Instant.now(); + private static final String tableName = generateUniqueName(); + private static final String UPSERT_SQL = "UPSERT INTO " + tableName + "(ORGANIZATION_ID, CLIENT_TYPE, GROUP_ID, MY_KEY, MY_VALUE, SIZE, NEXT_CHUNK, POD, CREATED_DATE, EXPIRY_DATE) values (?,?,?,?,?,?,?,?,?,?)"; + private static final String GROUP_CONDITION = "ORGANIZATION_ID=? and CLIENT_TYPE=? and GROUP_ID=?"; + private static final String KEY_CONDITION = GROUP_CONDITION + " and MY_KEY=?"; + + private static final String SELECT_KEY_SQL = "SELECT EXPIRY_DATE, NEXT_CHUNK, MY_VALUE, CREATED_DATE FROM " + tableName + " WHERE " + KEY_CONDITION; + protected static final String CREATE_TABLE_SQL = String.format("CREATE TABLE IF NOT EXISTS %s ( \n" + + " ORGANIZATION_ID CHAR(18) NOT NULL, \n" + + " CLIENT_TYPE VARCHAR NOT NULL, \n" + + " GROUP_ID VARCHAR NOT NULL, \n" + + " MY_KEY VARCHAR NOT NULL, \n" + + " MY_VALUE VARBINARY, \n" + + " SIZE INTEGER,\n" + + " NEXT_CHUNK BOOLEAN,\n" + + " POD VARCHAR, \n" + + " CREATED_DATE DATE,\n" + + " EXPIRY_DATE DATE,\n" + + " CONSTRAINT PK_DATA PRIMARY KEY \n" + + " ( \n" + + " ORGANIZATION_ID, \n" + + " CLIENT_TYPE, \n" + + " GROUP_ID, \n" + + " MY_KEY \n" + + " ) \n" + + ") IMMUTABLE_ROWS=true, VERSIONS=1, DISABLE_TABLE_SOR=true, REPLICATION_SCOPE=1, TTL=864000", tableName); + + protected static final String ORG_ID = "org000000000000001"; + protected static final String GROUP_ID = "groupId"; + + @Rule + public TestName testName = new TestName(); + + @Test + public void testWhenAllConnectionsClosed() throws Exception { + /** + * Case: when connections are closed + * Expectation: + * No failures due to throttling. + * # of not closed connection = 0 + * logs contain o=upserted (indicating upsert operation) + */ + + int loadFailures = runSampleActivity(ActivityType.LOAD, 10, 100, 10, 0); + ConnectionLimiter limiter = getConnectionLimiter(); + assertTrue(limiter instanceof LoggingConnectionLimiter); + int count = ((LoggingConnectionLimiter) limiter).getConnectionCount(); + assertTrue("Should not have any failures", loadFailures == 0); + assertTrue("Num connections not closed not as expected", count == 0); + Map<String, String> logs = ((LoggingConnectionLimiter) limiter).getActivityLog(); + for (Map.Entry<String, String> logEntry : logs.entrySet()) { + assertTrue(logEntry.getValue().contains("o=upserted")); + } + + } + + @Test + public void testActivityLogsOnUpsertsWhenNoFailures() throws Exception { + /** + * Case: when connections not closed is less than throttling threshold + * Expectation: + * No failures due to throttling. + * # of not closed connection = numDoNotClose + * logs contain o=upserted (indicating upsert operation) + */ + + int loadFailures = runSampleActivity(ActivityType.LOAD, 10, 100, 10, 10); + ConnectionLimiter limiter = getConnectionLimiter(); + assertTrue(limiter instanceof LoggingConnectionLimiter); + int count = ((LoggingConnectionLimiter) limiter).getConnectionCount(); + assertTrue("Should not have any failures", loadFailures == 0); + assertTrue("Num connections not closed not as expected", count == 10); + Map<String, String> logs = ((LoggingConnectionLimiter) limiter).getActivityLog(); + for (Map.Entry<String, String> logEntry : logs.entrySet()) { + assertTrue(logEntry.getValue().contains("o=upserted")); + } + } + + @Test + public void testActivityLogsOnQueryWhenNoFailures() throws Exception { + /** + * Case: when connections that are not closed is less than throttling threshold + * Expectation: + * No failures due to throttling. + * # of not closed connection = numDoNotClose + * logs contain o=queried (indicating query operation) + */ + int queryFailures = runSampleActivity(ActivityType.QUERY, 10, 100, 10, 10); + ConnectionLimiter limiter = getConnectionLimiter(); + assertTrue(limiter instanceof LoggingConnectionLimiter); + int count = ((LoggingConnectionLimiter) limiter).getConnectionCount(); + assertTrue("Should not have any failures", queryFailures == 0); + assertTrue("Num connections not closed not as expected", count == 10); + Map<String, String> logs = ((LoggingConnectionLimiter) limiter).getActivityLog(); + for (Map.Entry<String, String> logEntry : logs.entrySet()) { + assertTrue(logEntry.getValue().contains("o=queried")); + } + } + + @Test + public void testActivityLogsOnUpsertWhenFailures() throws Exception { + /** + * Case: when connections not closed is => throttling threshold + * Expectation: + * Some failures due to throttling. + * Throttling will kick when the # of active threads + # of not closed connection >= throttling threshold. + * logs contain o=upserted (indicating upsert operation) + */ + int loadFailures = runSampleActivity(ActivityType.LOAD, 10, 100, 10, 20); + ConnectionLimiter limiter = getConnectionLimiter(); + assertTrue(limiter instanceof LoggingConnectionLimiter); + int count = ((LoggingConnectionLimiter) limiter).getConnectionCount(); + assertTrue("Should have some failures", loadFailures > 0); + assertTrue(String.format("Num connections not closed not as expected [expected >= %d, actual = %d", 10, count), count >= 10); + Map<String, String> logs = ((LoggingConnectionLimiter) limiter).getActivityLog(); + for (Map.Entry<String, String> logEntry : logs.entrySet()) { + assertTrue(logEntry.getValue().contains("o=upserted")); + } + + } + + @Test + public void testActivityLogsOnQueryWhenFailures() throws Exception { + /** + * Case: when connections not closed is => throttling threshold + * Expectation: + * Some failures due to throttling. + * Throttling will kick when the # of active threads + # of not closed connection >= throttling threshold. + * logs contain o=queried (indicating query operation) + */ + int queryFailures = runSampleActivity(ActivityType.QUERY, 10, 100, 10, 20); + ConnectionLimiter limiter = getConnectionLimiter(); + assertTrue(limiter instanceof LoggingConnectionLimiter); + int count = ((LoggingConnectionLimiter) limiter).getConnectionCount(); + assertTrue("Should have some failures", queryFailures > 0); + assertTrue(String.format("Num connections not closed not as expected [expected >= %d, actual = %d", 10, count), count >= 10); + Map<String, String> logs = ((LoggingConnectionLimiter) limiter).getActivityLog(); + for (Map.Entry<String, String> logEntry : logs.entrySet()) { + assertTrue(logEntry.getValue().contains("o=queried")); + } + + } + + protected abstract ConnectionLimiter getConnectionLimiter() throws Exception ; + + protected int runSampleActivity(ActivityType activityType, int clientPool, int clientQueue, int numRows, int connNotClosed) throws Exception { + /** + * clientPool : number of active client threads + * clientQueue : total number of client calls per test run + */ + + Random rnd = new Random(); + + ExecutorService executorService = new ThreadPoolExecutor(clientPool, clientPool, 10, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(clientQueue)); + + + ArrayList<CompletableFuture<String>> clientCallList = new ArrayList<>(); + ArrayList<CompletableFuture<?>> mayHaveFailedList = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(clientQueue); + Set<Integer> skipCloseForClients = new HashSet<>(); + for (int i = 0;i < clientQueue && connNotClosed > 0;i++) { + skipCloseForClients.add(rnd.nextInt(clientQueue)); + if (skipCloseForClients.size() == connNotClosed) break; + } + Set<Integer> skippedCloseForClients = new HashSet<>(); + for (int i = 0; i < clientQueue; i++) { + + CompletableFuture<String> mockCompletableFuture = new CompletableFuture<>(); + CompletableFuture<?> mayHaveException = mockCompletableFuture.whenCompleteAsync((r, e) -> { + String resultInfo = activityType + " activity started"; + int index = Integer.parseInt(r); + try { + int rowsToWrite = numRows; + String orgId = StringUtils.rightPad(BaseTest.generateUniqueName(), 15).substring(0, 15); + String groupId = testName.getMethodName(); + //LOG.info("Client : " + resultInfo + ":" + r); + Connection connection = getConnection(); + try { + connection.setAutoCommit(false); + switch (activityType) { + case LOAD: + loadData(connection, orgId, groupId, rowsToWrite, 20); + break; + case QUERY: + loadData(connection, orgId, groupId, rowsToWrite, 20); + queryData(connection, orgId, groupId); + break; + default: + break; + } + resultInfo = "activity completed"; + //LOG.info("Client : " + resultInfo + ":" + r); + } finally { + if (!skipCloseForClients.contains(index)) { + connection.close(); + } + if (skipCloseForClients.contains(index)) { + resultInfo = "skipped close"; + //LOG.info("Client : " + resultInfo + ":" + r); + skippedCloseForClients.add(Integer.valueOf(r)); + } + } + } catch (SQLException ex) { + resultInfo = "failed sqle"; + //LOG.error(resultInfo, ex); + throw new RuntimeException(ex); + } catch (Exception ex) { + resultInfo = "failed general"; + //LOG.error(resultInfo, ex); + throw new RuntimeException(ex); + } finally { + latch.countDown(); + } + }, executorService); + mayHaveFailedList.add(mayHaveException); + clientCallList.add(mockCompletableFuture); + } + + // Explicitly complete the future (client call) to invoke open and close methods. + for (int i = 0; i < clientCallList.size(); i++) { + clientCallList.get(i).complete(String.valueOf(i)); + } + // Wait for all client calls to finish + latch.await(); + executorService.shutdown(); + + AtomicInteger failedCount = new AtomicInteger(); + // Iterate thru client calls that may have failed. + mayHaveFailedList.forEach(cf -> { + cf.whenComplete((r, e) -> { + if (e != null) { + failedCount.incrementAndGet(); + //LOG.info("Failed message: " + e.getMessage()); + } else { + //LOG.info("Success message: " + r); + } + }); + }); + + return failedCount.get(); + } + + protected static void loadData(Connection connection, String orgId, String groupId, int rows, int batchSize) throws SQLException { + Integer counter = 0; + //See W-8064529 for reuse of the preparedstatement + for (int i = 0; i < rows; i++) { + try (PreparedStatement ps = connection.prepareStatement(UPSERT_SQL)) { + ps.setString(1, orgId); + ps.setString(2, "CLIENT_TYPE"); + ps.setString(3, groupId); + ps.setString(4, String.valueOf(counter++)); + ps.setBytes(5, new byte[]{counter.byteValue()}); + ps.setInt(6, 1); + ps.setBoolean(7, false); + ps.setString(8, "pod"); + ps.setTimestamp(9, Timestamp.from(NOW)); + ps.setTimestamp(10, Timestamp.from(NOW.plusSeconds(3600))); + int result = ps.executeUpdate(); + if (result != 1) { + throw new RuntimeException("Phoenix error: upsert count is not one. It is " + result); + } + } + if (!connection.getAutoCommit() && counter % batchSize == 0) { + connection.commit(); + } + } + if (!connection.getAutoCommit()) { + connection.commit(); //send any remaining rows + } + } + + protected void queryData(Connection connection, String orgId, String groupId) throws SQLException { + try (PreparedStatement statement = connection.prepareStatement(SELECT_KEY_SQL)) { + + statement.setString(1, orgId); + statement.setString(2, "CLIENT_TYPE"); + statement.setString(3, groupId); + statement.setString(4, "3"); + + ResultSet rs = statement.executeQuery(); + assertTrue(rs.next()); + //counter gets incremented prior to putting value so 3+1=4 + assertArrayEquals(new byte[]{Integer.valueOf(4).byteValue()}, rs.getBytes(3)); + assertFalse(rs.next()); + } + } + + protected abstract Connection getConnection() throws SQLException; + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingHAConnectionLimiterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingHAConnectionLimiterIT.java new file mode 100644 index 0000000000..c4075ffdd5 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingHAConnectionLimiterIT.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.log.ConnectionLimiter; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityPolicy.PARALLEL; + +@Category(NeedsOwnMiniClusterTest.class) +public class LoggingHAConnectionLimiterIT extends LoggingConnectionLimiterIT { + private static final Logger LOG = LoggerFactory.getLogger(LoggingHAConnectionLimiterIT.class); + private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new HighAvailabilityTestingUtility.HBaseTestingUtilityPair(); + private static Map<String, String> GLOBAL_PROPERTIES ; + + private static List<Connection> CONNECTIONS = null; + + /** + * Client properties to create a connection per test. + */ + private Properties clientProperties; + /** + * JDBC connection string for this test HA group. + */ + private String jdbcUrl; + /** + * HA group for this test. + */ + private HighAvailabilityGroup haGroup; + + @BeforeClass + public static final void doSetup() throws Exception { + /** + * Turn on the connection logging feature + * CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS : max allowed connections before throttling + * INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS : max allowed internal connections before throttling + * HA_MAX_POOL_SIZE : HA thread pool size for open and other activities + * HA_MAX_QUEUE_SIZE : Queue size of the core thread pool + * + */ + GLOBAL_PROPERTIES = new HashMap<String, String>() {{ + put(QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED, String.valueOf(true)); + put(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(20)); + put(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(20)); + put(PhoenixHAExecutorServiceProvider.HA_MAX_POOL_SIZE, String.valueOf(5)); + put(PhoenixHAExecutorServiceProvider.HA_MAX_QUEUE_SIZE, String.valueOf(30)); + + }}; + + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + GLOBAL_PROPERTIES.put(PHOENIX_HA_GROUP_ATTR, PARALLEL.name()); + + + CONNECTIONS = Lists.newArrayList(CLUSTERS.getCluster1Connection(), CLUSTERS.getCluster2Connection()); + LOG.info(String.format("************* Num connections : %d", CONNECTIONS.size())); + + for (Connection conn : CONNECTIONS) { + try (Statement statement = conn.createStatement()) { + statement.execute(CREATE_TABLE_SQL); + } + conn.commit(); + } + + //preload some data + try (Connection connection = CLUSTERS.getCluster1Connection()) { + loadData(connection, ORG_ID, GROUP_ID, 100, 20); + } + } + + + @AfterClass + public static void tearDownAfterClass() throws Exception { + for (Connection conn : CONNECTIONS) { + conn.close(); + } + + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + clientProperties = new Properties(); + clientProperties.putAll(GLOBAL_PROPERTIES); + + String haGroupName = testName.getMethodName(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName, PARALLEL); + + jdbcUrl = String.format("jdbc:phoenix:[%s|%s]", + CLUSTERS.getUrl1(), CLUSTERS.getUrl2()); + haGroup = HighAvailabilityTestingUtility.getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Initialized haGroup {} with URL {}", haGroup.getGroupInfo().getName(), jdbcUrl); + + } + + @Override + protected ConnectionLimiter getConnectionLimiter() throws Exception { + ConnectionQueryServices cqs = null; + Connection testConnection = null; + try { + testConnection = getConnection(); + ParallelPhoenixConnection phoenixConnection = testConnection.unwrap(ParallelPhoenixConnection.class); + cqs = phoenixConnection.getFutureConnection1().get().getQueryServices(); + return cqs.getConnectionLimiter(); + } finally { + if (testConnection != null) testConnection.close(); + } + } + + @Override + protected Connection getConnection() throws SQLException { + Connection connection = DriverManager.getConnection(jdbcUrl, clientProperties); + connection.setAutoCommit(true); + return connection; + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java new file mode 100644 index 0000000000..fb0ad2871d --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.log.ConnectionLimiter; +import org.apache.phoenix.query.ConfigurationFactory; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.HBaseFactoryProvider; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.InstanceResolver; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +@Category(NeedsOwnMiniClusterTest.class) +public class LoggingSingleConnectionLimiterIT extends LoggingConnectionLimiterIT { + private static final Logger LOG = LoggerFactory.getLogger(LoggingSingleConnectionLimiterIT.class); + + @BeforeClass + public static void doSetup() throws Exception { + /** + * Turn on the connection logging feature + * CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS : max allowed connections before throttling + * INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS : max allowed internal connections before throttling + * HA_MAX_POOL_SIZE : HA thread pool size for open and other activities + * HA_MAX_QUEUE_SIZE : Queue size of the core thread pool + */ + + InstanceResolver.clearSingletons(); + // Override to get required config for static fields loaded that require HBase config + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + + @Override public Configuration getConfiguration() { + Configuration conf = HBaseConfiguration.create(); + conf.set(QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED, String.valueOf(true)); + conf.set(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(20)); + conf.set(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(20)); + conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_POOL_SIZE, String.valueOf(5)); + conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_QUEUE_SIZE, String.valueOf(30)); + return conf; + } + + @Override public Configuration getConfiguration(Configuration confToClone) { + Configuration conf = HBaseConfiguration.create(); + conf.set(QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED, String.valueOf(true)); + conf.set(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(20)); + conf.set(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(20)); + conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_POOL_SIZE, String.valueOf(5)); + conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_QUEUE_SIZE, String.valueOf(30)); + Configuration copy = new Configuration(conf); + copy.addResource(confToClone); + return copy; + } + }); + + Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + HBaseTestingUtility hBaseTestingUtility = new HBaseTestingUtility(conf); + setUpConfigForMiniCluster(conf); + hBaseTestingUtility.startMiniCluster(); + // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver + String zkQuorum = "localhost:" + hBaseTestingUtility.getZkCluster().getClientPort(); + url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + + String profileName = "setup"; + final String urlWithPrinc = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + profileName + + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; + Properties props = new Properties(); + + try (Connection connection = DriverManager.getConnection(urlWithPrinc, props)) { + try (Statement statement = connection.createStatement()) { + statement.execute(CREATE_TABLE_SQL); + } + connection.commit(); + } + + //preload some data + try (Connection connection = DriverManager.getConnection(urlWithPrinc, props)) { + loadData(connection, ORG_ID, GROUP_ID, 100, 20); + } + + } + @Override + protected ConnectionLimiter getConnectionLimiter() throws Exception { + ConnectionQueryServices cqs = null; + Connection testConnection = null; + try { + testConnection = getConnection(); + PhoenixConnection phoenixConnection = testConnection.unwrap(PhoenixConnection.class); + cqs = phoenixConnection.getQueryServices(); + return cqs.getConnectionLimiter(); + } finally { + if (testConnection != null) testConnection.close(); + } + } + + @Override + protected Connection getConnection() throws SQLException { + String profileName = testName.getMethodName(); + final String urlWithPrinc = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + profileName + + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; + Properties props = new Properties(); + Connection connection = DriverManager.getConnection(urlWithPrinc, props); + connection.setAutoCommit(true); + return connection; + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 2d88a142d2..493fdf5037 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -50,12 +50,15 @@ import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; import java.text.Format; + + import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -80,6 +83,8 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.iterate.TableResultIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser; +import org.apache.phoenix.log.ActivityLogInfo; +import org.apache.phoenix.log.ConnectionActivityLogger; import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.TableMetricsManager; @@ -192,6 +197,8 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix //public interfaces. private final boolean isInternalConnection; private boolean isApplyTimeZoneDisplacement; + private final UUID uniqueID; + private ConnectionActivityLogger connectionActivityLogger = ConnectionActivityLogger.NO_OP_LOGGER; static { Tracing.addTraceMetricsSource(); @@ -379,7 +386,7 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix this.services.getProps()); this.mutationState = mutationState == null ? newMutationState(maxSize, maxSizeBytes) : new MutationState(mutationState, this); - + this.uniqueID = UUID.randomUUID(); this.services.addConnection(this); // setup tracing, if its enabled @@ -957,6 +964,10 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix return new PhoenixDatabaseMetaData(this); } + public UUID getUniqueID() { + return this.uniqueID; + } + @Override public int getTransactionIsolation() throws SQLException { boolean transactionsEnabled = getQueryServices().getProps().getBoolean( @@ -1303,6 +1314,9 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix public void incrementStatementExecutionCounter() { statementExecutionCounter++; + if (connectionActivityLogger.isLevelEnabled(ActivityLogInfo.OP_STMTS.getLogLevel())) { + connectionActivityLogger.log(ActivityLogInfo.OP_STMTS, String.valueOf(statementExecutionCounter)); + } } public TraceScope getTraceScope() { @@ -1431,4 +1445,16 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix public boolean isApplyTimeZoneDisplacement() { return isApplyTimeZoneDisplacement; } + + public String getActivityLog() { + return getActivityLogger().getActivityLog(); + } + + public ConnectionActivityLogger getActivityLogger() { + return this.connectionActivityLogger; + } + + public void setActivityLogger(ConnectionActivityLogger connectionActivityLogger) { + this.connectionActivityLogger = connectionActivityLogger; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixMonitoredConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixMonitoredConnection.java index 5f94689864..d017f7bbcc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixMonitoredConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixMonitoredConnection.java @@ -21,7 +21,6 @@ package org.apache.phoenix.jdbc; import org.apache.phoenix.monitoring.MetricType; import java.sql.Connection; -import java.sql.ResultSet; import java.util.Map; /** @@ -48,4 +47,5 @@ public interface PhoenixMonitoredConnection extends Connection { * metrics for individual DML. */ void clearMetrics(); + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index d73f1cbc15..1996577fcb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -113,6 +113,7 @@ import org.apache.phoenix.iterate.ExplainTable; import org.apache.phoenix.iterate.MaterializedResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.log.ActivityLogInfo; import org.apache.phoenix.log.AuditQueryLogger; import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.log.QueryLogInfo; @@ -276,10 +277,13 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable protected final PhoenixConnection connection; private static final int NO_UPDATE = -1; + private static final String TABLE_UNKNOWN = ""; private List<PhoenixResultSet> resultSets = new ArrayList<PhoenixResultSet>(); private QueryPlan lastQueryPlan; private PhoenixResultSet lastResultSet; private int lastUpdateCount = NO_UPDATE; + + private String lastUpdateTable = TABLE_UNKNOWN; private Operation lastUpdateOperation; private boolean isClosed = false; private int maxRows; @@ -397,6 +401,7 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable setLastQueryPlan(plan); setLastResultSet(rs); setLastUpdateCount(NO_UPDATE); + setLastUpdateTable(tableName == null ? TABLE_UNKNOWN : tableName); setLastUpdateOperation(stmt.getOperation()); // If transactional, this will move the read pointer forward if (connection.getAutoCommit() && !noCommit) { @@ -568,6 +573,7 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, lastState.getUpdateCount()); setLastUpdateCount(lastUpdateCount); setLastUpdateOperation(stmt.getOperation()); + setLastUpdateTable(tableName == null ? TABLE_UNKNOWN : tableName); connection.incrementStatementExecutionCounter(); if (queryLogger.isAuditLoggingEnabled()) { queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt)); @@ -2477,12 +2483,31 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable this.lastUpdateCount = lastUpdateCount; } + private String getLastUpdateTable() { + return lastUpdateTable; + } + + private void setLastUpdateTable(String lastUpdateTable) { + if (!Strings.isNullOrEmpty(lastUpdateTable)) { + this.lastUpdateTable = lastUpdateTable; + } + if (getConnection().getActivityLogger().isLevelEnabled(ActivityLogInfo.TABLE_NAME.getLogLevel())) { + updateActivityOnConnection(ActivityLogInfo.TABLE_NAME, this.lastUpdateTable); + } + } + private Operation getLastUpdateOperation() { return lastUpdateOperation; } private void setLastUpdateOperation(Operation lastUpdateOperation) { this.lastUpdateOperation = lastUpdateOperation; + if (getConnection().getActivityLogger().isLevelEnabled(ActivityLogInfo.OP_NAME.getLogLevel())) { + updateActivityOnConnection(ActivityLogInfo.OP_NAME, this.lastUpdateOperation.toString()); + } + if (getConnection().getActivityLogger().isLevelEnabled(ActivityLogInfo.OP_TIME.getLogLevel())) { + updateActivityOnConnection(ActivityLogInfo.OP_TIME, String.valueOf(EnvironmentEdgeManager.currentTimeMillis())); + } } private QueryPlan getLastQueryPlan() { @@ -2491,8 +2516,13 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable private void setLastQueryPlan(QueryPlan lastQueryPlan) { this.lastQueryPlan = lastQueryPlan; + } - + + private void updateActivityOnConnection(ActivityLogInfo item, String value) { + getConnection().getActivityLogger().log(item, value); + } + private void throwIfUnallowedUserDefinedFunctions(Map<String, UDFParseNode> udfParseNodes) throws SQLException { if (!connection .getQueryServices() diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ActivityLogInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/log/ActivityLogInfo.java new file mode 100644 index 0000000000..3baf44754d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/ActivityLogInfo.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.schema.types.PVarchar; + +import java.util.EnumSet; + + +public enum ActivityLogInfo { + + START_TIME("s", LogLevel.INFO,PTimestamp.INSTANCE), + OP_TIME("u", LogLevel.INFO,PTimestamp.INSTANCE), + TENANT_ID("t", LogLevel.INFO,PVarchar.INSTANCE), + CQS_NAME("p", LogLevel.INFO,PVarchar.INSTANCE), + REQUEST_ID("r", LogLevel.INFO,PVarchar.INSTANCE), + TABLE_NAME("n", LogLevel.INFO,PVarchar.INSTANCE), + OP_NAME("o", LogLevel.INFO,PVarchar.INSTANCE), + OP_STMTS("#", LogLevel.INFO, PInteger.INSTANCE); + + public final String shortName; + public final LogLevel logLevel; + public final PDataType dataType; + + private ActivityLogInfo(String shortName, LogLevel logLevel, PDataType dataType) { + this.shortName = shortName; + this.logLevel=logLevel; + this.dataType=dataType; + } + + public String getShortName() { + return shortName; + } + + public LogLevel getLogLevel() { + return logLevel; + } + + public PDataType getDataType() { + return dataType; + } + + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java new file mode 100644 index 0000000000..ad720c6155 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.base.Strings; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +import javax.annotation.concurrent.GuardedBy; +import java.sql.SQLException; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER; + +/** + * A base class for concrete implementation of ConnectionLimiter. + * 1. Should be called only when holding the ConnectionQueryServicesImpl.connectionCountLock + * 2. Does the basic accounting on open and close connection calls. + * 3. If connection throttling is enabled checks and calls onLimit if the threshold is breached. + */ +public abstract class BaseConnectionLimiter implements ConnectionLimiter { + protected int connectionCount = 0; + protected int internalConnectionCount = 0; + protected String profileName; + protected int maxConnectionsAllowed; + protected int maxInternalConnectionsAllowed; + protected boolean shouldThrottleNumConnections; + + protected BaseConnectionLimiter(String profileName, boolean shouldThrottleNumConnections, int maxConnectionsAllowed, int maxInternalConnectionsAllowed) { + this.profileName = profileName; + this.shouldThrottleNumConnections = shouldThrottleNumConnections; + this.maxConnectionsAllowed = maxConnectionsAllowed; + this.maxInternalConnectionsAllowed = maxInternalConnectionsAllowed; + } + @Override + @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock") + public void acquireConnection(PhoenixConnection connection) throws SQLException { + Preconditions.checkNotNull(connection.getUniqueID(), "Got null UUID for Phoenix Connection!"); + + /* + * If we are throttling connections internal connections and client created connections + * are counted separately against each respective quota. + */ + if (shouldThrottleNumConnections) { + int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount); + int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed; + // if throttling threshold is reached, try reclaiming garbage collected phoenix connections. + if ((allowedConnections != 0) && (futureConnections > allowedConnections) && (onSweep(connection.isInternalConnection()) == 0)) { + GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment(); + + //TODO:- After PHOENIX-7038 per profile Phoenix Throttled Counter should be updated here. + + // Let the concrete classes handle the onLimit. + // They can either throw the exception back or handle it. + SQLException connectionThrottledException = connection.isInternalConnection() ? + new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED). + build().buildException() : + new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED). + build().buildException(); + throw connectionThrottledException; + } + } + + if (connection.isInternalConnection()) { + internalConnectionCount++; + } else { + connectionCount++; + } + + } + + @Override + @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock") + public void returnConnection(PhoenixConnection connection) { + if (connection.isInternalConnection() && internalConnectionCount > 0) { + --internalConnectionCount; + } else if (!connection.isInternalConnection() && connectionCount > 0) { + --connectionCount; + } + } + + @Override + @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock") + public boolean isLastConnection() { + return connectionCount + internalConnectionCount - 1 <= 0; + } + @Override + public boolean isShouldThrottleNumConnections() { + return shouldThrottleNumConnections; + } + + @VisibleForTesting + @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock") + public int getConnectionCount() { + return connectionCount; + } + + @Override + public int onSweep(boolean internal) { + return 0; + } + + @VisibleForTesting + @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock") + public int getInternalConnectionCount() { + return internalConnectionCount; + } + + public int getMaxConnectionsAllowed() { + return maxConnectionsAllowed; + } + + public int getMaxInternalConnectionsAllowed() { + return maxInternalConnectionsAllowed; + } + +} + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionActivityLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionActivityLogger.java new file mode 100644 index 0000000000..ee2a2a8af4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionActivityLogger.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.thirdparty.com.google.common.base.Strings; +import org.apache.phoenix.util.EnvironmentEdgeManager; + +import java.lang.ref.WeakReference; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + + +/** + * Logger for connection related activities. + * See also {@link ActivityLogInfo} + */ +public class ConnectionActivityLogger { + private LogLevel logLevel; + private boolean isInternalConnection; + private UUID connectionID; + private WeakReference<PhoenixConnection> connectionReference; + List<String> activityList = Stream.of(ActivityLogInfo.values()).map(f -> "").collect(Collectors.toList()); + + public ConnectionActivityLogger(PhoenixConnection connection, LogLevel level) { + logLevel = level; + this.isInternalConnection = connection.isInternalConnection(); + this.connectionID = connection.getUniqueID(); + this.connectionReference = new WeakReference<PhoenixConnection>(connection); + connection.setActivityLogger(this); + log(ActivityLogInfo.START_TIME, String.valueOf(EnvironmentEdgeManager.currentTimeMillis())); + PName tenantName = connection.getTenantId(); + if (tenantName != null) { + log(ActivityLogInfo.TENANT_ID, tenantName.getString()); + } + // TODO: CQS_NAME (Connection Profile Name) + + } + + public ConnectionActivityLogger() { + logLevel = LogLevel.OFF; + } + + public static final ConnectionActivityLogger NO_OP_LOGGER = new ConnectionActivityLogger() { + + @Override + public void log(ActivityLogInfo ActivityLogInfo, String info) {} + + @Override + public boolean isDebugEnabled() { + return false; + } + + @Override + public boolean isInfoEnabled() { + return false; + } + + @Override + public boolean isLevelEnabled(LogLevel logLevel) { + return false; + } + + @Override + public boolean isInternalConnection() { + return false; + } + + @Override + public PhoenixConnection getConnection() { return null; } + + @Override + public String getActivityLog() {return "";} + + @Override + public String getConnectionID() {return "";} + + }; + + public String getConnectionID() { + return connectionID.toString(); + } + + public boolean isInternalConnection() { + return isInternalConnection; + } + + public PhoenixConnection getConnection() { + return connectionReference.get(); + } + + /** + * Set logging info for a given activity + */ + public void log(ActivityLogInfo activity, String info) { + if (logLevel == LogLevel.OFF) return; + activityList.set(activity.ordinal(), info); + } + + /** + * Get the formatted log for external logging. + */ + public String getActivityLog() { + return IntStream + .range(0, ActivityLogInfo.values().length) + .filter((i) -> {return !Strings.isNullOrEmpty(activityList.get(i)) && isLevelEnabled(ActivityLogInfo.values()[i].getLogLevel());}) + .mapToObj(i -> new StringBuilder().append(ActivityLogInfo.values()[i].shortName).append("=").append(activityList.get(i)).toString()) + .collect(Collectors.joining(", ")); + } + /** + * Is Info logging currently enabled? + * Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than info. + * @return + */ + public boolean isInfoEnabled(){ + return isLevelEnabled(LogLevel.INFO); + } + + /** + * Is debug logging currently enabled? + * Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than debug. + */ + public boolean isDebugEnabled(){ + return isLevelEnabled(LogLevel.DEBUG); + } + + public boolean isLevelEnabled(LogLevel logLevel) { + return this.logLevel != null && logLevel != LogLevel.OFF ? logLevel.ordinal() <= this.logLevel.ordinal() + : false; + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java new file mode 100644 index 0000000000..3bc6d465b2 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; +import org.apache.phoenix.jdbc.PhoenixConnection; + +import java.sql.SQLException; + +/** + * This interface defines the contract for storing information about Phoenix connections + * for debugging client-side issues like + * {@link org.apache.phoenix.exception.SQLExceptionCode#NEW_CONNECTION_THROTTLED} + */ +public interface ConnectionLimiter { + + void acquireConnection(PhoenixConnection connection) throws SQLException; + + void returnConnection(PhoenixConnection connection); + + int onSweep(boolean internal) ; + + boolean isLastConnection(); + + boolean isShouldThrottleNumConnections(); +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/DefaultConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/DefaultConnectionLimiter.java new file mode 100644 index 0000000000..2045ef510e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/DefaultConnectionLimiter.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import org.apache.phoenix.jdbc.PhoenixConnection; + +import java.sql.SQLException; + +/** + * Default implementation of a ConnectionLimiter. + */ +public class DefaultConnectionLimiter extends BaseConnectionLimiter { + private DefaultConnectionLimiter(Builder builder) { + super(builder.profileName, builder.shouldThrottleNumConnections, builder.maxConnectionsAllowed, builder.maxInternalConnectionsAllowed); + } + + public static class Builder { + protected String profileName; + protected int maxConnectionsAllowed; + protected int maxInternalConnectionsAllowed; + protected boolean shouldThrottleNumConnections; + + public Builder(boolean shouldThrottleNumConnections) { + this.shouldThrottleNumConnections = shouldThrottleNumConnections; + } + + public DefaultConnectionLimiter.Builder withConnectionProfile(String profileName) { + this.profileName = profileName; + return this; + } + + public DefaultConnectionLimiter.Builder withMaxAllowed(int maxAllowed) { + this.maxConnectionsAllowed = maxAllowed; + return this; + } + + public DefaultConnectionLimiter.Builder withMaxInternalAllowed(int maxInternalAllowed) { + this.maxInternalConnectionsAllowed = maxInternalAllowed; + return this; + } + + + public ConnectionLimiter build() { + return new DefaultConnectionLimiter(this); + } + + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LoggingConnectionLimiter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LoggingConnectionLimiter.java new file mode 100644 index 0000000000..e93a8a7f88 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LoggingConnectionLimiter.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER; + +/** + * An implementation of a ConnectionLimiter which logs activity info at configured intervals + * for the active connections in the map when throttling threshold is reached. + */ +public class LoggingConnectionLimiter extends BaseConnectionLimiter { + private static final Logger LOGGER = LoggerFactory.getLogger(LoggingConnectionLimiter.class); + private static long MIN_IN_MILLIS = 60 * 1000; + protected final boolean enableActivityLogging; + protected final long loggingIntervalInMillis; + protected long lastLoggedTimeInMillis; + protected long lastCollectedTimeInMillis; + // Map Phoenix connection UUID to its connection activity logger object + protected final Map<UUID, ConnectionActivityLogger> openConnectionActivityLoggers; + + private LoggingConnectionLimiter(Builder builder) { + super(builder.profileName, builder.shouldThrottleNumConnections, builder.maxConnectionsAllowed, builder.maxInternalConnectionsAllowed); + this.enableActivityLogging = builder.enableActivityLogging; + this.loggingIntervalInMillis = builder.loggingIntervalInMins * MIN_IN_MILLIS; + this.lastCollectedTimeInMillis = this.lastLoggedTimeInMillis = System.currentTimeMillis(); + this.openConnectionActivityLoggers = Maps.newHashMap(); + } + + @Override + public void acquireConnection(PhoenixConnection connection) throws SQLException { + super.acquireConnection(connection); + if ((this.enableActivityLogging) && (this.openConnectionActivityLoggers.size() < this.maxConnectionsAllowed + this.maxInternalConnectionsAllowed)) { + ConnectionActivityLogger logger = new ConnectionActivityLogger(connection, LogLevel.INFO); + this.openConnectionActivityLoggers.put(connection.getUniqueID(), logger); + } + } + + @Override + public void returnConnection(PhoenixConnection connection) { + super.returnConnection(connection); + UUID phxConnUniqueID = connection.getUniqueID(); + Preconditions.checkNotNull(phxConnUniqueID, "Got null UUID for Phoenix Connection!"); + if (this.enableActivityLogging) { + this.openConnectionActivityLoggers.remove(phxConnUniqueID); + } + } + + @Override + public int onSweep(boolean internal) { + long currentTimeInMillis = System.currentTimeMillis(); + boolean shouldCollectNow = (currentTimeInMillis - lastCollectedTimeInMillis) >= loggingIntervalInMillis; + int garbageCollectedConnections = 0; + if (this.enableActivityLogging && shouldCollectNow) { + Iterator<Map.Entry<UUID, ConnectionActivityLogger>> iterator = openConnectionActivityLoggers.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<UUID, ConnectionActivityLogger> entry = iterator.next(); + ConnectionActivityLogger logger = entry.getValue(); + // check for reclaim only for connection/logger that match the sweep type. for e.g internal or external client connections. + boolean checkReclaimable = ((logger.isInternalConnection() && internal) || (!logger.isInternalConnection() && !internal)); + if (checkReclaimable) { + PhoenixConnection monitoredConnection = logger.getConnection(); + LOGGER.info(String.format("connection-sweep-activity-log for %s: %s", logger.getConnectionID(), logger.getActivityLog())); + if (monitoredConnection == null) { + garbageCollectedConnections += collectConnection(internal); + iterator.remove(); + } + } + } + LOGGER.info(String.format("connection-profile-metrics-log for %s: internal=%s, freed=%d, current=%d, open=%d, throttled=%d", + this.profileName, + internal, + garbageCollectedConnections, + internal ? getInternalConnectionCount(): getConnectionCount(), + internal ? + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue() : + GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue(), + GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().getValue())); + + // Register the last logged time + lastCollectedTimeInMillis = currentTimeInMillis; + + } + return garbageCollectedConnections; + } + + private int collectConnection(boolean internal) { + if (internal && internalConnectionCount > 0) { + --internalConnectionCount; + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement(); + return 1; + } else if (!internal && connectionCount > 0) { + --connectionCount; + GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement(); + return 1; + } + return 0; + } + + @VisibleForTesting + public Map<String, String> getActivityLog() throws SQLException { + Map<String, String> activityLog = Maps.newHashMap(); + if (this.enableActivityLogging) { + for (ConnectionActivityLogger connectionLogger : openConnectionActivityLoggers.values()) { + activityLog.put(connectionLogger.getConnectionID(), connectionLogger.getActivityLog()); + } + } + return activityLog; + } + + public static class Builder { + + protected String profileName; + protected boolean enableActivityLogging; + protected int loggingIntervalInMins; + protected int maxConnectionsAllowed; + protected int maxInternalConnectionsAllowed; + protected boolean shouldThrottleNumConnections; + + public Builder(boolean shouldThrottleNumConnections) { + this.shouldThrottleNumConnections = shouldThrottleNumConnections; + } + + public Builder withConnectionProfile(String profileName) { + this.profileName = profileName; + return this; + } + + public Builder withMaxAllowed(int maxAllowed) { + this.maxConnectionsAllowed = maxAllowed; + return this; + } + + public Builder withMaxInternalAllowed(int maxInternalAllowed) { + this.maxInternalConnectionsAllowed = maxInternalAllowed; + return this; + } + + public Builder withLogging(boolean enabled) { + this.enableActivityLogging = enabled; + return this; + } + public Builder withLoggingIntervalInMins(int loggingIntervalInMins) { + this.loggingIntervalInMins = loggingIntervalInMins; + return this; + } + + public ConnectionLimiter build() { + return new LoggingConnectionLimiter(this); + } + + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index dd62ed125e..b89ad3e435 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -39,6 +39,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.log.ConnectionLimiter; import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PColumn; @@ -221,4 +222,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated } PMetaData getMetaDataCache(); + public default ConnectionLimiter getConnectionLimiter() { + throw new UnsupportedOperationException(); + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 551be97e1c..bf161a9499 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -67,7 +67,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY; import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAILURES; @@ -235,6 +234,9 @@ import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.log.ConnectionLimiter; +import org.apache.phoenix.log.DefaultConnectionLimiter; +import org.apache.phoenix.log.LoggingConnectionLimiter; import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.monitoring.TableMetricsManager; import org.apache.phoenix.parse.PFunction; @@ -331,6 +333,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final GuidePostsCacheProvider GUIDE_POSTS_CACHE_PROVIDER = new GuidePostsCacheProvider(); protected final Configuration config; + + public ConnectionInfo getConnectionInfo() { + return connectionInfo; + } + protected final ConnectionInfo connectionInfo; // Copy of config.getProps(), but read-only to prevent synchronization that we // don't need. @@ -394,6 +401,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private ServerSideRPCControllerFactory serverSideRPCControllerFactory; private boolean localIndexUpgradeRequired; + private final boolean enableConnectionActivityLogging; + private final int loggingIntervalInMins; + + private final ConnectionLimiter connectionLimiter; + private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); } @@ -487,6 +499,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement this.maxInternalConnectionsAllowed = config.getInt(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, QueryServicesOptions.DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS); this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0) || (maxInternalConnectionsAllowed > 0); + this.enableConnectionActivityLogging = + config.getBoolean(CONNECTION_ACTIVITY_LOGGING_ENABLED, + QueryServicesOptions.DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED); + this.loggingIntervalInMins = + config.getInt(CONNECTION_ACTIVITY_LOGGING_INTERVAL, + QueryServicesOptions.DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS); + + if (enableConnectionActivityLogging) { + LoggingConnectionLimiter.Builder builder = new LoggingConnectionLimiter.Builder(shouldThrottleNumConnections); + connectionLimiter = builder + .withLoggingIntervalInMins(loggingIntervalInMins) + .withLogging(true) + .withMaxAllowed(this.maxConnectionsAllowed) + .withMaxInternalAllowed(this.maxInternalConnectionsAllowed) + .build(); + } else { + DefaultConnectionLimiter.Builder builder = new DefaultConnectionLimiter.Builder(shouldThrottleNumConnections); + connectionLimiter = builder + .withMaxAllowed(this.maxConnectionsAllowed) + .withMaxInternalAllowed(this.maxInternalConnectionsAllowed) + .build(); + } + + if (!QueryUtil.isServerConnection(props)) { //Start queryDistruptor everytime as log level can be change at connection level as well, but we can avoid starting for server connections. try { @@ -1917,6 +1953,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + @VisibleForTesting + public ConnectionLimiter getConnectionLimiter() { + return connectionLimiter; + } /** * helper function to return the exception from the RPC * @param controller @@ -5641,30 +5681,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public void addConnection(PhoenixConnection connection) throws SQLException { if (returnSequenceValues || shouldThrottleNumConnections) { synchronized (connectionCountLock) { - - /* - * If we are throttling connections internal connections and client created connections - * are counted separately against each respective quota. - */ - if (shouldThrottleNumConnections) { - int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount); - int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed; - if (allowedConnections != 0 && futureConnections > allowedConnections) { - GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment(); - if (connection.isInternalConnection()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED). - build().buildException(); - } - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED). - build().buildException(); - } - } - - if (!connection.isInternalConnection()) { - connectionCount++; - } else { - internalConnectionCount++; - } + connectionLimiter.acquireConnection(connection); } } // If lease renewal isn't enabled, these are never cleaned up. Tracking when renewals @@ -5680,7 +5697,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null; synchronized (connectionCountLock) { if (!connection.isInternalConnection()) { - if (connectionCount + internalConnectionCount - 1 <= 0) { + if (connectionLimiter.isLastConnection()) { if (!this.sequenceMap.isEmpty()) { formerSequenceMap = this.sequenceMap; this.sequenceMap = Maps.newConcurrentMap(); @@ -5695,13 +5712,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement returnAllSequences(formerSequenceMap); } } - if (returnSequenceValues || shouldThrottleNumConnections){ //still need to decrement connection count + if (returnSequenceValues || connectionLimiter.isShouldThrottleNumConnections()) { //still need to decrement connection count synchronized (connectionCountLock) { - if (connection.isInternalConnection() && internalConnectionCount > 0) { - --internalConnectionCount; - } else if (connectionCount > 0) { - --connectionCount; - } + connectionLimiter.returnConnection(connection); } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 4f86efc8aa..72ee10c215 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -38,6 +38,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.log.ConnectionLimiter; import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; @@ -411,4 +412,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public PMetaData getMetaDataCache() { return getDelegate().getMetaDataCache(); } + + public ConnectionLimiter getConnectionLimiter() { + return getDelegate().getConnectionLimiter(); + } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index f980fe041e..a273403195 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -293,6 +293,10 @@ public interface QueryServices extends SQLCloseable { //max number of connections from a single client to a single cluster. 0 is unlimited. public static final String INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = "phoenix.internal.connection.max.allowed.connections"; + public static final String CONNECTION_ACTIVITY_LOGGING_ENABLED = + "phoenix.connection.activity.logging.enabled"; + public static final String CONNECTION_ACTIVITY_LOGGING_INTERVAL = + "phoenix.connection.activity.logging.interval"; public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib"; public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme"; public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index adef425d82..81a2a29004 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -28,6 +28,8 @@ import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG; import static org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; +import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED; +import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_INTERVAL; import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB; @@ -343,6 +345,9 @@ public class QueryServicesOptions { public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0; //by default, max internal connections from one client to one cluster is unlimited public static final int DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0; + + public static final boolean DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED = false; + public static final int DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS = 15; public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true; public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true; @@ -497,7 +502,10 @@ public class QueryServicesOptions { .setIfUnset(MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN, DEFAULT_MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN) .setIfUnset(SERVER_MERGE_FOR_UNCOVERED_INDEX, - DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX); + DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX) + .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE) + .setIfUnset(CONNECTION_ACTIVITY_LOGGING_ENABLED, DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED) + .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set