This is an automated email from the ASF dual-hosted git repository.

jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new f40921ce33 PHOENIX-7063 Track and account garbage collected phoenix 
connections (#1706)
f40921ce33 is described below

commit f40921ce33b51a76d941c0231a6c1acdac2edac4
Author: Lokesh Khurana <khuranalokes...@gmail.com>
AuthorDate: Thu Oct 12 18:24:59 2023 -0700

    PHOENIX-7063 Track and account garbage collected phoenix connections (#1706)
    
    Track and account garbage collected phoenix connections
    ---------
    
    Co-authored-by: Jacob Isaac <jis...@salesforce.com>
    Co-authored-by: Jacob Isaac <jacobpisaa...@gmail.com>
    Co-authored-by: Lokesh Khurana <lokesh.khur...@salesforce.com>
---
 .../phoenix/jdbc/LoggingConnectionLimiterIT.java   | 349 +++++++++++++++++++++
 .../phoenix/jdbc/LoggingHAConnectionLimiterIT.java | 155 +++++++++
 .../jdbc/LoggingSingleConnectionLimiterIT.java     | 135 ++++++++
 .../org/apache/phoenix/jdbc/PhoenixConnection.java |  30 +-
 .../phoenix/jdbc/PhoenixMonitoredConnection.java   |   2 +-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  32 +-
 .../org/apache/phoenix/log/ActivityLogInfo.java    |  62 ++++
 .../apache/phoenix/log/BaseConnectionLimiter.java  | 136 ++++++++
 .../phoenix/log/ConnectionActivityLogger.java      | 150 +++++++++
 .../org/apache/phoenix/log/ConnectionLimiter.java  |  39 +++
 .../phoenix/log/DefaultConnectionLimiter.java      |  64 ++++
 .../phoenix/log/LoggingConnectionLimiter.java      | 181 +++++++++++
 .../phoenix/query/ConnectionQueryServices.java     |   5 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  77 +++--
 .../query/DelegateConnectionQueryServices.java     |   5 +
 .../org/apache/phoenix/query/QueryServices.java    |   4 +
 .../apache/phoenix/query/QueryServicesOptions.java |  10 +-
 17 files changed, 1399 insertions(+), 37 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingConnectionLimiterIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingConnectionLimiterIT.java
new file mode 100644
index 0000000000..3a313f4973
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingConnectionLimiterIT.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.log.ConnectionLimiter;
+import org.apache.phoenix.log.LoggingConnectionLimiter;
+import org.apache.phoenix.query.BaseTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Set;
+import java.util.Random;
+
+
+import java.time.Instant;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public abstract class LoggingConnectionLimiterIT extends BaseTest {
+    private static enum ActivityType {
+        QUERY, LOAD
+    }
+    //private static final Logger LOG = 
LoggerFactory.getLogger(LoggingConnectionLimiterIT.class);
+    private static final Instant NOW = Instant.now();
+    private static final String tableName = generateUniqueName();
+    private static final String UPSERT_SQL = "UPSERT INTO " + tableName + 
"(ORGANIZATION_ID, CLIENT_TYPE, GROUP_ID, MY_KEY, MY_VALUE, SIZE, NEXT_CHUNK, 
POD, CREATED_DATE, EXPIRY_DATE) values (?,?,?,?,?,?,?,?,?,?)";
+    private static final String GROUP_CONDITION = "ORGANIZATION_ID=? and 
CLIENT_TYPE=? and GROUP_ID=?";
+    private static final String KEY_CONDITION = GROUP_CONDITION + " and 
MY_KEY=?";
+
+    private static final String SELECT_KEY_SQL = "SELECT EXPIRY_DATE, 
NEXT_CHUNK, MY_VALUE, CREATED_DATE FROM " + tableName + " WHERE " + 
KEY_CONDITION;
+    protected static final String CREATE_TABLE_SQL = String.format("CREATE 
TABLE IF NOT EXISTS %s (  \n" +
+            "  ORGANIZATION_ID CHAR(18) NOT NULL,  \n" +
+            "  CLIENT_TYPE VARCHAR NOT NULL,  \n" +
+            "  GROUP_ID VARCHAR NOT NULL,  \n" +
+            "  MY_KEY VARCHAR NOT NULL,  \n" +
+            "  MY_VALUE VARBINARY,  \n" +
+            "  SIZE INTEGER,\n" +
+            "  NEXT_CHUNK BOOLEAN,\n" +
+            "  POD VARCHAR,  \n" +
+            "  CREATED_DATE DATE,\n" +
+            "  EXPIRY_DATE DATE,\n" +
+            "  CONSTRAINT PK_DATA PRIMARY KEY   \n" +
+            "  (  \n" +
+            "    ORGANIZATION_ID,  \n" +
+            "    CLIENT_TYPE,  \n" +
+            "    GROUP_ID,  \n" +
+            "    MY_KEY  \n" +
+            "  )  \n" +
+            ") IMMUTABLE_ROWS=true, VERSIONS=1, DISABLE_TABLE_SOR=true, 
REPLICATION_SCOPE=1, TTL=864000", tableName);
+
+    protected static final String ORG_ID = "org000000000000001";
+    protected static final String GROUP_ID = "groupId";
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Test
+    public void testWhenAllConnectionsClosed() throws Exception {
+        /**
+         * Case: when connections are closed
+         * Expectation:
+         * No failures due to throttling.
+         * # of not closed connection = 0
+         * logs contain o=upserted (indicating upsert operation)
+         */
+
+        int loadFailures = runSampleActivity(ActivityType.LOAD, 10, 100, 10, 
0);
+        ConnectionLimiter limiter = getConnectionLimiter();
+        assertTrue(limiter instanceof LoggingConnectionLimiter);
+        int count = ((LoggingConnectionLimiter) limiter).getConnectionCount();
+        assertTrue("Should not have any failures", loadFailures == 0);
+        assertTrue("Num connections not closed not as expected", count == 0);
+        Map<String, String> logs = ((LoggingConnectionLimiter) 
limiter).getActivityLog();
+        for (Map.Entry<String, String> logEntry : logs.entrySet()) {
+            assertTrue(logEntry.getValue().contains("o=upserted"));
+        }
+
+    }
+
+    @Test
+    public void testActivityLogsOnUpsertsWhenNoFailures() throws Exception {
+        /**
+         * Case: when connections not closed is less than throttling threshold
+         * Expectation:
+         * No failures due to throttling.
+         * # of not closed connection = numDoNotClose
+         * logs contain o=upserted (indicating upsert operation)
+         */
+
+        int loadFailures = runSampleActivity(ActivityType.LOAD, 10, 100, 10, 
10);
+        ConnectionLimiter limiter = getConnectionLimiter();
+        assertTrue(limiter instanceof LoggingConnectionLimiter);
+        int count = ((LoggingConnectionLimiter) limiter).getConnectionCount();
+        assertTrue("Should not have any failures", loadFailures == 0);
+        assertTrue("Num connections not closed not as expected", count == 10);
+        Map<String, String> logs = ((LoggingConnectionLimiter) 
limiter).getActivityLog();
+        for (Map.Entry<String, String> logEntry : logs.entrySet()) {
+            assertTrue(logEntry.getValue().contains("o=upserted"));
+        }
+    }
+
+    @Test
+    public void testActivityLogsOnQueryWhenNoFailures() throws Exception {
+        /**
+         * Case: when connections that are not closed is less than throttling 
threshold
+         * Expectation:
+         * No failures due to throttling.
+         * # of not closed connection = numDoNotClose
+         * logs contain o=queried (indicating query operation)
+         */
+        int queryFailures = runSampleActivity(ActivityType.QUERY, 10, 100, 10, 
10);
+        ConnectionLimiter limiter = getConnectionLimiter();
+        assertTrue(limiter instanceof LoggingConnectionLimiter);
+        int count = ((LoggingConnectionLimiter) limiter).getConnectionCount();
+        assertTrue("Should not have any failures", queryFailures == 0);
+        assertTrue("Num connections not closed not as expected", count == 10);
+        Map<String, String> logs = ((LoggingConnectionLimiter) 
limiter).getActivityLog();
+        for (Map.Entry<String, String> logEntry : logs.entrySet()) {
+            assertTrue(logEntry.getValue().contains("o=queried"));
+        }
+    }
+
+    @Test
+    public void testActivityLogsOnUpsertWhenFailures() throws Exception {
+        /**
+         * Case: when connections not closed is => throttling threshold
+         * Expectation:
+         * Some failures due to throttling.
+         * Throttling will kick when the # of active threads + # of not closed 
connection >= throttling threshold.
+         * logs contain o=upserted (indicating upsert operation)
+         */
+        int loadFailures = runSampleActivity(ActivityType.LOAD, 10, 100, 10, 
20);
+        ConnectionLimiter limiter = getConnectionLimiter();
+        assertTrue(limiter instanceof LoggingConnectionLimiter);
+        int count = ((LoggingConnectionLimiter) limiter).getConnectionCount();
+        assertTrue("Should have some failures", loadFailures > 0);
+        assertTrue(String.format("Num connections not closed not as expected 
[expected >= %d, actual = %d", 10, count), count >= 10);
+        Map<String, String> logs = ((LoggingConnectionLimiter) 
limiter).getActivityLog();
+        for (Map.Entry<String, String> logEntry : logs.entrySet()) {
+            assertTrue(logEntry.getValue().contains("o=upserted"));
+        }
+
+    }
+
+    @Test
+    public void testActivityLogsOnQueryWhenFailures() throws Exception {
+        /**
+         * Case: when connections not closed is => throttling threshold
+         * Expectation:
+         * Some failures due to throttling.
+         * Throttling will kick when the # of active threads + # of not closed 
connection >= throttling threshold.
+         * logs contain o=queried (indicating query operation)
+         */
+        int queryFailures = runSampleActivity(ActivityType.QUERY, 10, 100, 10, 
20);
+        ConnectionLimiter limiter = getConnectionLimiter();
+        assertTrue(limiter instanceof LoggingConnectionLimiter);
+        int count = ((LoggingConnectionLimiter) limiter).getConnectionCount();
+        assertTrue("Should have some failures", queryFailures > 0);
+        assertTrue(String.format("Num connections not closed not as expected 
[expected >= %d, actual = %d", 10, count), count >= 10);
+        Map<String, String> logs = ((LoggingConnectionLimiter) 
limiter).getActivityLog();
+        for (Map.Entry<String, String> logEntry : logs.entrySet()) {
+            assertTrue(logEntry.getValue().contains("o=queried"));
+        }
+
+    }
+
+    protected abstract ConnectionLimiter getConnectionLimiter() throws 
Exception ;
+
+    protected int runSampleActivity(ActivityType activityType, int clientPool, 
int clientQueue, int numRows, int connNotClosed) throws Exception {
+        /**
+         * clientPool : number of active client threads
+         * clientQueue : total number of client calls per test run
+         */
+
+        Random rnd = new Random();
+
+        ExecutorService executorService = new ThreadPoolExecutor(clientPool, 
clientPool, 10,
+                TimeUnit.SECONDS, new ArrayBlockingQueue<>(clientQueue));
+
+
+        ArrayList<CompletableFuture<String>> clientCallList = new 
ArrayList<>();
+        ArrayList<CompletableFuture<?>> mayHaveFailedList = new ArrayList<>();
+        CountDownLatch latch = new CountDownLatch(clientQueue);
+        Set<Integer> skipCloseForClients = new HashSet<>();
+        for (int i = 0;i < clientQueue && connNotClosed > 0;i++) {
+            skipCloseForClients.add(rnd.nextInt(clientQueue));
+            if (skipCloseForClients.size() == connNotClosed) break;
+        }
+        Set<Integer> skippedCloseForClients = new HashSet<>();
+        for (int i = 0; i < clientQueue; i++) {
+
+            CompletableFuture<String> mockCompletableFuture = new 
CompletableFuture<>();
+            CompletableFuture<?> mayHaveException = 
mockCompletableFuture.whenCompleteAsync((r, e) -> {
+                String resultInfo = activityType + " activity started";
+                int index = Integer.parseInt(r);
+                try {
+                    int rowsToWrite = numRows;
+                    String orgId = 
StringUtils.rightPad(BaseTest.generateUniqueName(), 15).substring(0, 15);
+                    String groupId = testName.getMethodName();
+                    //LOG.info("Client : " + resultInfo + ":" + r);
+                    Connection connection = getConnection();
+                    try {
+                        connection.setAutoCommit(false);
+                        switch (activityType) {
+                            case LOAD:
+                                loadData(connection, orgId, groupId, 
rowsToWrite, 20);
+                                break;
+                            case QUERY:
+                                loadData(connection, orgId, groupId, 
rowsToWrite, 20);
+                                queryData(connection, orgId, groupId);
+                                break;
+                            default:
+                                break;
+                        }
+                        resultInfo = "activity completed";
+                        //LOG.info("Client : " + resultInfo + ":" + r);
+                    } finally {
+                        if (!skipCloseForClients.contains(index)) {
+                            connection.close();
+                        }
+                        if (skipCloseForClients.contains(index)) {
+                            resultInfo = "skipped close";
+                            //LOG.info("Client : " + resultInfo + ":" + r);
+                            skippedCloseForClients.add(Integer.valueOf(r));
+                        }
+                    }
+                } catch (SQLException ex) {
+                    resultInfo = "failed sqle";
+                    //LOG.error(resultInfo, ex);
+                    throw new RuntimeException(ex);
+                } catch (Exception ex) {
+                    resultInfo = "failed general";
+                    //LOG.error(resultInfo, ex);
+                    throw new RuntimeException(ex);
+                } finally {
+                    latch.countDown();
+                }
+            }, executorService);
+            mayHaveFailedList.add(mayHaveException);
+            clientCallList.add(mockCompletableFuture);
+        }
+
+        // Explicitly complete the future (client call) to invoke open and 
close methods.
+        for (int i = 0; i < clientCallList.size(); i++) {
+            clientCallList.get(i).complete(String.valueOf(i));
+        }
+        // Wait for all client calls to finish
+        latch.await();
+        executorService.shutdown();
+
+        AtomicInteger failedCount = new AtomicInteger();
+        // Iterate thru client calls that may have failed.
+        mayHaveFailedList.forEach(cf -> {
+            cf.whenComplete((r, e) -> {
+                if (e != null)  {
+                    failedCount.incrementAndGet();
+                    //LOG.info("Failed message: " + e.getMessage());
+                } else {
+                    //LOG.info("Success message: " + r);
+                }
+            });
+        });
+
+        return failedCount.get();
+    }
+
+    protected static void loadData(Connection connection, String orgId, String 
groupId, int rows, int batchSize) throws SQLException {
+        Integer counter = 0;
+        //See W-8064529 for reuse of the preparedstatement
+        for (int i = 0; i < rows; i++) {
+            try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_SQL)) {
+                ps.setString(1, orgId);
+                ps.setString(2, "CLIENT_TYPE");
+                ps.setString(3, groupId);
+                ps.setString(4, String.valueOf(counter++));
+                ps.setBytes(5, new byte[]{counter.byteValue()});
+                ps.setInt(6, 1);
+                ps.setBoolean(7, false);
+                ps.setString(8, "pod");
+                ps.setTimestamp(9, Timestamp.from(NOW));
+                ps.setTimestamp(10, Timestamp.from(NOW.plusSeconds(3600)));
+                int result = ps.executeUpdate();
+                if (result != 1) {
+                    throw new RuntimeException("Phoenix error: upsert count is 
not one. It is " + result);
+                }
+            }
+            if (!connection.getAutoCommit() && counter % batchSize == 0) {
+                connection.commit();
+            }
+        }
+        if (!connection.getAutoCommit()) {
+            connection.commit(); //send any remaining rows
+        }
+    }
+
+    protected void queryData(Connection connection, String orgId, String 
groupId) throws SQLException {
+        try (PreparedStatement statement = 
connection.prepareStatement(SELECT_KEY_SQL)) {
+
+            statement.setString(1, orgId);
+            statement.setString(2, "CLIENT_TYPE");
+            statement.setString(3, groupId);
+            statement.setString(4, "3");
+
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            //counter gets incremented prior to putting value so 3+1=4
+            assertArrayEquals(new byte[]{Integer.valueOf(4).byteValue()}, 
rs.getBytes(3));
+            assertFalse(rs.next());
+        }
+    }
+
+    protected abstract Connection getConnection() throws SQLException;
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingHAConnectionLimiterIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingHAConnectionLimiterIT.java
new file mode 100644
index 0000000000..c4075ffdd5
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingHAConnectionLimiterIT.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.log.ConnectionLimiter;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static org.apache.phoenix.jdbc.HighAvailabilityPolicy.PARALLEL;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class LoggingHAConnectionLimiterIT extends LoggingConnectionLimiterIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingHAConnectionLimiterIT.class);
+    private static final 
HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new 
HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
+    private static Map<String, String> GLOBAL_PROPERTIES ;
+
+    private static List<Connection> CONNECTIONS = null;
+
+    /**
+     * Client properties to create a connection per test.
+     */
+    private Properties clientProperties;
+    /**
+     * JDBC connection string for this test HA group.
+     */
+    private String jdbcUrl;
+    /**
+     * HA group for this test.
+     */
+    private HighAvailabilityGroup haGroup;
+
+    @BeforeClass
+    public static final void doSetup() throws Exception {
+        /**
+         *  Turn on the connection logging feature
+         *  CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS : max allowed 
connections before throttling
+         *  INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS : max allowed internal 
connections before throttling
+         *  HA_MAX_POOL_SIZE : HA thread pool size for open and other 
activities
+         *  HA_MAX_QUEUE_SIZE : Queue size of the core thread pool
+         *
+         */
+        GLOBAL_PROPERTIES = new HashMap<String, String>() {{
+            put(QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED, 
String.valueOf(true));
+            put(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, 
String.valueOf(20));
+            put(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, 
String.valueOf(20));
+            put(PhoenixHAExecutorServiceProvider.HA_MAX_POOL_SIZE, 
String.valueOf(5));
+            put(PhoenixHAExecutorServiceProvider.HA_MAX_QUEUE_SIZE, 
String.valueOf(30));
+
+        }};
+
+        CLUSTERS.start();
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        GLOBAL_PROPERTIES.put(PHOENIX_HA_GROUP_ATTR, PARALLEL.name());
+
+
+        CONNECTIONS = Lists.newArrayList(CLUSTERS.getCluster1Connection(), 
CLUSTERS.getCluster2Connection());
+        LOG.info(String.format("************* Num connections : %d", 
CONNECTIONS.size()));
+
+        for (Connection conn : CONNECTIONS) {
+            try (Statement statement = conn.createStatement()) {
+                statement.execute(CREATE_TABLE_SQL);
+            }
+            conn.commit();
+        }
+
+        //preload some data
+        try (Connection connection = CLUSTERS.getCluster1Connection()) {
+            loadData(connection, ORG_ID, GROUP_ID, 100, 20);
+        }
+    }
+
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        for (Connection conn : CONNECTIONS) {
+            conn.close();
+        }
+
+        DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        CLUSTERS.close();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        clientProperties =  new Properties();
+        clientProperties.putAll(GLOBAL_PROPERTIES);
+
+        String haGroupName = testName.getMethodName();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+
+        // Make first cluster ACTIVE
+        CLUSTERS.initClusterRole(haGroupName, PARALLEL);
+
+        jdbcUrl = String.format("jdbc:phoenix:[%s|%s]",
+                CLUSTERS.getUrl1(), CLUSTERS.getUrl2());
+        haGroup = 
HighAvailabilityTestingUtility.getHighAvailibilityGroup(jdbcUrl, 
clientProperties);
+        LOG.info("Initialized haGroup {} with URL {}", 
haGroup.getGroupInfo().getName(), jdbcUrl);
+
+    }
+
+    @Override
+    protected ConnectionLimiter getConnectionLimiter() throws Exception {
+        ConnectionQueryServices cqs = null;
+        Connection testConnection = null;
+        try {
+            testConnection = getConnection();
+            ParallelPhoenixConnection phoenixConnection = 
testConnection.unwrap(ParallelPhoenixConnection.class);
+            cqs = 
phoenixConnection.getFutureConnection1().get().getQueryServices();
+            return cqs.getConnectionLimiter();
+        } finally {
+            if (testConnection != null) testConnection.close();
+        }
+    }
+
+    @Override
+    protected Connection getConnection() throws SQLException {
+        Connection connection = DriverManager.getConnection(jdbcUrl, 
clientProperties);
+        connection.setAutoCommit(true);
+        return connection;
+    }
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java
new file mode 100644
index 0000000000..fb0ad2871d
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/LoggingSingleConnectionLimiterIT.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.log.ConnectionLimiter;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class LoggingSingleConnectionLimiterIT extends 
LoggingConnectionLimiterIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingSingleConnectionLimiterIT.class);
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        /**
+         *  Turn on the connection logging feature
+         *  CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS : max allowed 
connections before throttling
+         *  INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS : max allowed internal 
connections before throttling
+         *  HA_MAX_POOL_SIZE : HA thread pool size for open and other 
activities
+         *  HA_MAX_QUEUE_SIZE : Queue size of the core thread pool
+         */
+
+        InstanceResolver.clearSingletons();
+        // Override to get required config for static fields loaded that 
require HBase config
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new 
ConfigurationFactory() {
+
+            @Override public Configuration getConfiguration() {
+                Configuration conf = HBaseConfiguration.create();
+                conf.set(QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED, 
String.valueOf(true));
+                
conf.set(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, 
String.valueOf(20));
+                
conf.set(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, 
String.valueOf(20));
+                conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_POOL_SIZE, 
String.valueOf(5));
+                conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_QUEUE_SIZE, 
String.valueOf(30));
+                return conf;
+            }
+
+            @Override public Configuration getConfiguration(Configuration 
confToClone) {
+                Configuration conf = HBaseConfiguration.create();
+                conf.set(QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED, 
String.valueOf(true));
+                
conf.set(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, 
String.valueOf(20));
+                
conf.set(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, 
String.valueOf(20));
+                conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_POOL_SIZE, 
String.valueOf(5));
+                conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_QUEUE_SIZE, 
String.valueOf(30));
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+
+        Configuration conf = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        HBaseTestingUtility hBaseTestingUtility = new 
HBaseTestingUtility(conf);
+        setUpConfigForMiniCluster(conf);
+        hBaseTestingUtility.startMiniCluster();
+        // establish url and quorum. Need to use PhoenixDriver and not 
PhoenixTestDriver
+        String zkQuorum = "localhost:" + 
hBaseTestingUtility.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+
+        String profileName = "setup";
+        final String urlWithPrinc = url + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + profileName
+                + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+        Properties props = new Properties();
+
+        try (Connection connection = DriverManager.getConnection(urlWithPrinc, 
props)) {
+            try (Statement statement = connection.createStatement()) {
+                statement.execute(CREATE_TABLE_SQL);
+            }
+            connection.commit();
+        }
+
+        //preload some data
+        try (Connection connection = DriverManager.getConnection(urlWithPrinc, 
props)) {
+            loadData(connection, ORG_ID, GROUP_ID, 100, 20);
+        }
+
+    }
+    @Override
+    protected ConnectionLimiter getConnectionLimiter() throws Exception {
+        ConnectionQueryServices cqs = null;
+        Connection testConnection = null;
+        try {
+            testConnection = getConnection();
+            PhoenixConnection phoenixConnection = 
testConnection.unwrap(PhoenixConnection.class);
+            cqs = phoenixConnection.getQueryServices();
+            return cqs.getConnectionLimiter();
+        } finally {
+            if (testConnection != null) testConnection.close();
+        }
+    }
+
+    @Override
+    protected Connection getConnection() throws SQLException {
+        String profileName = testName.getMethodName();
+        final String urlWithPrinc = url + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + profileName
+                + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+        Properties props = new Properties();
+        Connection connection = DriverManager.getConnection(urlWithPrinc, 
props);
+        connection.setAutoCommit(true);
+        return connection;
+    }
+
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 2d88a142d2..493fdf5037 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -50,12 +50,15 @@ import java.sql.Savepoint;
 import java.sql.Statement;
 import java.sql.Struct;
 import java.text.Format;
+
+
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -80,6 +83,8 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.iterate.TableResultIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser;
+import org.apache.phoenix.log.ActivityLogInfo;
+import org.apache.phoenix.log.ConnectionActivityLogger;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.TableMetricsManager;
@@ -192,6 +197,8 @@ public class PhoenixConnection implements MetaDataMutated, 
SQLCloseable, Phoenix
     //public interfaces.
     private final boolean isInternalConnection;
     private boolean isApplyTimeZoneDisplacement;
+    private final UUID uniqueID;
+    private ConnectionActivityLogger connectionActivityLogger = 
ConnectionActivityLogger.NO_OP_LOGGER;
 
     static {
         Tracing.addTraceMetricsSource();
@@ -379,7 +386,7 @@ public class PhoenixConnection implements MetaDataMutated, 
SQLCloseable, Phoenix
                     this.services.getProps());
             this.mutationState = mutationState == null ? 
newMutationState(maxSize,
                     maxSizeBytes) : new MutationState(mutationState, this);
-
+            this.uniqueID = UUID.randomUUID();
             this.services.addConnection(this);
 
             // setup tracing, if its enabled
@@ -957,6 +964,10 @@ public class PhoenixConnection implements MetaDataMutated, 
SQLCloseable, Phoenix
         return new PhoenixDatabaseMetaData(this);
     }
 
+    public UUID getUniqueID() {
+        return this.uniqueID;
+    }
+
     @Override
     public int getTransactionIsolation() throws SQLException {
         boolean transactionsEnabled = getQueryServices().getProps().getBoolean(
@@ -1303,6 +1314,9 @@ public class PhoenixConnection implements 
MetaDataMutated, SQLCloseable, Phoenix
 
     public void incrementStatementExecutionCounter() {
         statementExecutionCounter++;
+        if 
(connectionActivityLogger.isLevelEnabled(ActivityLogInfo.OP_STMTS.getLogLevel()))
 {
+            connectionActivityLogger.log(ActivityLogInfo.OP_STMTS, 
String.valueOf(statementExecutionCounter));
+        }
     }
 
     public TraceScope getTraceScope() {
@@ -1431,4 +1445,16 @@ public class PhoenixConnection implements 
MetaDataMutated, SQLCloseable, Phoenix
     public boolean isApplyTimeZoneDisplacement() {
         return isApplyTimeZoneDisplacement;
     }
+
+    public String getActivityLog() {
+        return getActivityLogger().getActivityLog();
+    }
+
+    public ConnectionActivityLogger getActivityLogger() {
+        return this.connectionActivityLogger;
+    }
+
+    public void setActivityLogger(ConnectionActivityLogger 
connectionActivityLogger) {
+        this.connectionActivityLogger = connectionActivityLogger;
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixMonitoredConnection.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixMonitoredConnection.java
index 5f94689864..d017f7bbcc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixMonitoredConnection.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixMonitoredConnection.java
@@ -21,7 +21,6 @@ package org.apache.phoenix.jdbc;
 import org.apache.phoenix.monitoring.MetricType;
 
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.util.Map;
 
 /**
@@ -48,4 +47,5 @@ public interface PhoenixMonitoredConnection extends 
Connection {
      * metrics for individual DML.
      */
     void clearMetrics();
+
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d73f1cbc15..1996577fcb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -113,6 +113,7 @@ import org.apache.phoenix.iterate.ExplainTable;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.log.ActivityLogInfo;
 import org.apache.phoenix.log.AuditQueryLogger;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.log.QueryLogInfo;
@@ -276,10 +277,13 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
 
     protected final PhoenixConnection connection;
     private static final int NO_UPDATE = -1;
+    private static final String TABLE_UNKNOWN = "";
     private List<PhoenixResultSet> resultSets = new 
ArrayList<PhoenixResultSet>();
     private QueryPlan lastQueryPlan;
     private PhoenixResultSet lastResultSet;
     private int lastUpdateCount = NO_UPDATE;
+
+    private String lastUpdateTable = TABLE_UNKNOWN;
     private Operation lastUpdateOperation;
     private boolean isClosed = false;
     private int maxRows;
@@ -397,6 +401,7 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                 setLastQueryPlan(plan);
                                 setLastResultSet(rs);
                                 setLastUpdateCount(NO_UPDATE);
+                                setLastUpdateTable(tableName == null ? 
TABLE_UNKNOWN : tableName);
                                 setLastUpdateOperation(stmt.getOperation());
                                 // If transactional, this will move the read 
pointer forward
                                 if (connection.getAutoCommit() && !noCommit) {
@@ -568,6 +573,7 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                 int lastUpdateCount = (int) 
Math.min(Integer.MAX_VALUE, lastState.getUpdateCount());
                                 setLastUpdateCount(lastUpdateCount);
                                 setLastUpdateOperation(stmt.getOperation());
+                                setLastUpdateTable(tableName == null ? 
TABLE_UNKNOWN : tableName);
                                 
connection.incrementStatementExecutionCounter();
                                 if (queryLogger.isAuditLoggingEnabled()) {
                                     queryLogger.log(QueryLogInfo.TABLE_NAME_I, 
getTargetForAudit(stmt));
@@ -2477,12 +2483,31 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
         this.lastUpdateCount = lastUpdateCount;
     }
 
+    private String getLastUpdateTable() {
+        return lastUpdateTable;
+    }
+
+    private void setLastUpdateTable(String lastUpdateTable) {
+        if (!Strings.isNullOrEmpty(lastUpdateTable)) {
+            this.lastUpdateTable = lastUpdateTable;
+        }
+        if 
(getConnection().getActivityLogger().isLevelEnabled(ActivityLogInfo.TABLE_NAME.getLogLevel()))
 {
+            updateActivityOnConnection(ActivityLogInfo.TABLE_NAME, 
this.lastUpdateTable);
+        }
+    }
+
     private Operation getLastUpdateOperation() {
         return lastUpdateOperation;
     }
 
     private void setLastUpdateOperation(Operation lastUpdateOperation) {
         this.lastUpdateOperation = lastUpdateOperation;
+        if 
(getConnection().getActivityLogger().isLevelEnabled(ActivityLogInfo.OP_NAME.getLogLevel()))
 {
+            updateActivityOnConnection(ActivityLogInfo.OP_NAME, 
this.lastUpdateOperation.toString());
+        }
+        if 
(getConnection().getActivityLogger().isLevelEnabled(ActivityLogInfo.OP_TIME.getLogLevel()))
 {
+            updateActivityOnConnection(ActivityLogInfo.OP_TIME, 
String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
+        }
     }
 
     private QueryPlan getLastQueryPlan() {
@@ -2491,8 +2516,13 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
 
     private void setLastQueryPlan(QueryPlan lastQueryPlan) {
         this.lastQueryPlan = lastQueryPlan;
+
     }
-    
+
+    private void updateActivityOnConnection(ActivityLogInfo item, String 
value) {
+        getConnection().getActivityLogger().log(item, value);
+    }
+
     private void throwIfUnallowedUserDefinedFunctions(Map<String, 
UDFParseNode> udfParseNodes) throws SQLException {
         if (!connection
                 .getQueryServices()
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/ActivityLogInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/ActivityLogInfo.java
new file mode 100644
index 0000000000..3baf44754d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/ActivityLogInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarchar;
+
+import java.util.EnumSet;
+
+
+public enum ActivityLogInfo {
+
+    START_TIME("s", LogLevel.INFO,PTimestamp.INSTANCE),
+    OP_TIME("u", LogLevel.INFO,PTimestamp.INSTANCE),
+    TENANT_ID("t", LogLevel.INFO,PVarchar.INSTANCE),
+    CQS_NAME("p", LogLevel.INFO,PVarchar.INSTANCE),
+    REQUEST_ID("r", LogLevel.INFO,PVarchar.INSTANCE),
+    TABLE_NAME("n", LogLevel.INFO,PVarchar.INSTANCE),
+    OP_NAME("o", LogLevel.INFO,PVarchar.INSTANCE),
+    OP_STMTS("#", LogLevel.INFO, PInteger.INSTANCE);
+
+    public final String shortName;
+    public final LogLevel logLevel;
+    public final PDataType dataType;
+
+    private ActivityLogInfo(String shortName, LogLevel logLevel, PDataType 
dataType) {
+        this.shortName = shortName;
+        this.logLevel=logLevel;
+        this.dataType=dataType;
+    }
+
+    public String getShortName() {
+        return shortName;
+    }
+
+    public LogLevel getLogLevel() {
+        return logLevel;
+    }
+
+    public PDataType getDataType() {
+        return dataType;
+    }
+    
+
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java
new file mode 100644
index 0000000000..ad720c6155
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/BaseConnectionLimiter.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.sql.SQLException;
+
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+
+/**
+ * A base class for concrete implementation of ConnectionLimiter.
+ * 1. Should be called only when holding the 
ConnectionQueryServicesImpl.connectionCountLock
+ * 2. Does the basic accounting on open and close connection calls.
+ * 3. If connection throttling is enabled checks and calls onLimit if the 
threshold is breached.
+ */
+public abstract class BaseConnectionLimiter implements ConnectionLimiter {
+    protected int connectionCount = 0;
+    protected int internalConnectionCount = 0;
+    protected String profileName;
+    protected int maxConnectionsAllowed;
+    protected int maxInternalConnectionsAllowed;
+    protected boolean shouldThrottleNumConnections;
+
+    protected BaseConnectionLimiter(String profileName, boolean 
shouldThrottleNumConnections, int maxConnectionsAllowed, int 
maxInternalConnectionsAllowed) {
+        this.profileName = profileName;
+        this.shouldThrottleNumConnections = shouldThrottleNumConnections;
+        this.maxConnectionsAllowed = maxConnectionsAllowed;
+        this.maxInternalConnectionsAllowed = maxInternalConnectionsAllowed;
+    }
+    @Override
+    @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock")
+    public void acquireConnection(PhoenixConnection connection) throws 
SQLException {
+        Preconditions.checkNotNull(connection.getUniqueID(), "Got null UUID 
for Phoenix Connection!");
+
+        /*
+         * If we are throttling connections internal connections and client 
created connections
+         *   are counted separately against each respective quota.
+         */
+        if (shouldThrottleNumConnections) {
+            int futureConnections = 1 + ( connection.isInternalConnection() ? 
internalConnectionCount : connectionCount);
+            int allowedConnections = connection.isInternalConnection() ? 
maxInternalConnectionsAllowed : maxConnectionsAllowed;
+            // if throttling threshold is reached, try reclaiming garbage 
collected phoenix connections.
+            if ((allowedConnections != 0) && (futureConnections > 
allowedConnections) && (onSweep(connection.isInternalConnection()) == 0)) {
+                GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
+
+                //TODO:- After PHOENIX-7038 per profile Phoenix Throttled 
Counter should be updated here.
+
+                // Let the concrete classes handle the onLimit.
+                // They can either throw the exception back or handle it.
+                SQLException connectionThrottledException = 
connection.isInternalConnection() ?
+                        new 
SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED).
+                                build().buildException() :
+                        new 
SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
+                                build().buildException();
+                throw connectionThrottledException;
+            }
+        }
+
+        if (connection.isInternalConnection()) {
+            internalConnectionCount++;
+        } else {
+            connectionCount++;
+        }
+
+    }
+
+    @Override
+    @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock")
+    public void returnConnection(PhoenixConnection connection) {
+        if (connection.isInternalConnection() && internalConnectionCount > 0) {
+            --internalConnectionCount;
+        } else if (!connection.isInternalConnection() && connectionCount > 0) {
+            --connectionCount;
+        }
+    }
+
+    @Override
+    @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock")
+    public boolean isLastConnection() {
+        return connectionCount + internalConnectionCount - 1 <= 0;
+    }
+    @Override
+    public boolean isShouldThrottleNumConnections() {
+        return shouldThrottleNumConnections;
+    }
+
+    @VisibleForTesting
+    @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock")
+    public int getConnectionCount() {
+        return connectionCount;
+    }
+
+    @Override
+    public int onSweep(boolean internal) {
+        return 0;
+    }
+
+    @VisibleForTesting
+    @GuardedBy("ConnectionQueryServicesImpl.connectionCountLock")
+    public int getInternalConnectionCount() {
+        return internalConnectionCount;
+    }
+
+    public int getMaxConnectionsAllowed() {
+        return maxConnectionsAllowed;
+    }
+
+    public int getMaxInternalConnectionsAllowed() {
+        return maxInternalConnectionsAllowed;
+    }
+
+}
+
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionActivityLogger.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionActivityLogger.java
new file mode 100644
index 0000000000..ee2a2a8af4
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionActivityLogger.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import java.lang.ref.WeakReference;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+
+/**
+ * Logger for connection related activities.
+ * See also {@link ActivityLogInfo}
+ */
+public class ConnectionActivityLogger {
+    private LogLevel logLevel;
+    private boolean isInternalConnection;
+    private UUID connectionID;
+    private WeakReference<PhoenixConnection> connectionReference;
+    List<String> activityList = Stream.of(ActivityLogInfo.values()).map(f -> 
"").collect(Collectors.toList());
+
+    public ConnectionActivityLogger(PhoenixConnection connection, LogLevel 
level) {
+        logLevel = level;
+        this.isInternalConnection = connection.isInternalConnection();
+        this.connectionID = connection.getUniqueID();
+        this.connectionReference = new 
WeakReference<PhoenixConnection>(connection);
+        connection.setActivityLogger(this);
+        log(ActivityLogInfo.START_TIME, 
String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
+        PName tenantName = connection.getTenantId();
+        if (tenantName != null) {
+            log(ActivityLogInfo.TENANT_ID, tenantName.getString());
+        }
+        // TODO: CQS_NAME (Connection Profile Name)
+
+    }
+
+    public ConnectionActivityLogger() {
+        logLevel = LogLevel.OFF;
+    }
+
+    public static final ConnectionActivityLogger NO_OP_LOGGER = new 
ConnectionActivityLogger() {
+
+        @Override
+        public void log(ActivityLogInfo ActivityLogInfo, String info) {}
+
+        @Override
+        public boolean isDebugEnabled() {
+            return false;
+        }
+
+        @Override
+        public boolean isInfoEnabled() {
+            return false;
+        }
+
+        @Override
+        public boolean isLevelEnabled(LogLevel logLevel) {
+            return false;
+        }
+
+        @Override
+        public boolean isInternalConnection() {
+            return false;
+        }
+
+        @Override
+        public PhoenixConnection getConnection() { return null; }
+
+        @Override
+        public String getActivityLog() {return "";}
+
+        @Override
+        public String getConnectionID() {return "";}
+
+    };
+
+    public String getConnectionID() {
+        return connectionID.toString();
+    }
+
+    public boolean isInternalConnection() {
+        return isInternalConnection;
+    }
+
+    public PhoenixConnection getConnection() {
+        return connectionReference.get();
+    }
+
+    /**
+     * Set logging info for a given activity
+     */
+    public void log(ActivityLogInfo activity, String info) {
+        if (logLevel == LogLevel.OFF) return;
+        activityList.set(activity.ordinal(), info);
+    }
+
+    /**
+     * Get the formatted log for external logging.
+     */
+    public String getActivityLog() {
+        return IntStream
+                .range(0, ActivityLogInfo.values().length)
+                .filter((i) -> {return 
!Strings.isNullOrEmpty(activityList.get(i)) && 
isLevelEnabled(ActivityLogInfo.values()[i].getLogLevel());})
+                .mapToObj(i -> new 
StringBuilder().append(ActivityLogInfo.values()[i].shortName).append("=").append(activityList.get(i)).toString())
+                .collect(Collectors.joining(", "));
+    }
+    /**
+     * Is Info logging currently enabled?
+     * Call this method to prevent having to perform expensive operations (for 
example, String concatenation) when the log level is more than info.
+     * @return
+     */
+    public boolean isInfoEnabled(){
+        return isLevelEnabled(LogLevel.INFO);
+    }
+
+    /**
+     *  Is debug logging currently enabled?
+     *  Call this method to prevent having to perform expensive operations 
(for example, String concatenation) when the log level is more than debug.
+     */
+    public boolean isDebugEnabled(){
+        return isLevelEnabled(LogLevel.DEBUG);
+    }
+
+    public boolean isLevelEnabled(LogLevel logLevel) {
+        return this.logLevel != null && logLevel != LogLevel.OFF ? 
logLevel.ordinal() <= this.logLevel.ordinal()
+                : false;
+    }
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
new file mode 100644
index 0000000000..3bc6d465b2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/ConnectionLimiter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+import java.sql.SQLException;
+
+/**
+ * This interface defines the contract for storing information about Phoenix 
connections
+ * for debugging client-side issues like
+ * {@link 
org.apache.phoenix.exception.SQLExceptionCode#NEW_CONNECTION_THROTTLED}
+ */
+public interface ConnectionLimiter {
+
+    void acquireConnection(PhoenixConnection connection) throws SQLException;
+
+    void returnConnection(PhoenixConnection connection);
+
+    int onSweep(boolean internal) ;
+
+    boolean isLastConnection();
+
+    boolean isShouldThrottleNumConnections();
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/DefaultConnectionLimiter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/log/DefaultConnectionLimiter.java
new file mode 100644
index 0000000000..2045ef510e
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/DefaultConnectionLimiter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+import java.sql.SQLException;
+
+/**
+ * Default implementation of a ConnectionLimiter.
+ */
+public class DefaultConnectionLimiter extends BaseConnectionLimiter {
+    private DefaultConnectionLimiter(Builder builder) {
+        super(builder.profileName, builder.shouldThrottleNumConnections, 
builder.maxConnectionsAllowed, builder.maxInternalConnectionsAllowed);
+    }
+
+    public static class Builder {
+        protected String profileName;
+        protected int maxConnectionsAllowed;
+        protected int maxInternalConnectionsAllowed;
+        protected boolean shouldThrottleNumConnections;
+
+        public Builder(boolean shouldThrottleNumConnections) {
+            this.shouldThrottleNumConnections = shouldThrottleNumConnections;
+        }
+
+        public DefaultConnectionLimiter.Builder withConnectionProfile(String 
profileName) {
+            this.profileName = profileName;
+            return this;
+        }
+
+        public DefaultConnectionLimiter.Builder withMaxAllowed(int maxAllowed) 
{
+            this.maxConnectionsAllowed = maxAllowed;
+            return this;
+        }
+
+        public DefaultConnectionLimiter.Builder withMaxInternalAllowed(int 
maxInternalAllowed) {
+            this.maxInternalConnectionsAllowed = maxInternalAllowed;
+            return this;
+        }
+
+
+        public ConnectionLimiter build() {
+            return new DefaultConnectionLimiter(this);
+        }
+
+    }
+
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/LoggingConnectionLimiter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/log/LoggingConnectionLimiter.java
new file mode 100644
index 0000000000..e93a8a7f88
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/LoggingConnectionLimiter.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
+
+/**
+ * An implementation of a ConnectionLimiter which logs activity info at 
configured intervals
+ * for the active connections in the map when throttling threshold is reached.
+ */
+public class LoggingConnectionLimiter extends BaseConnectionLimiter {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggingConnectionLimiter.class);
+    private static long MIN_IN_MILLIS = 60 * 1000;
+    protected final boolean enableActivityLogging;
+    protected final long loggingIntervalInMillis;
+    protected long lastLoggedTimeInMillis;
+    protected long lastCollectedTimeInMillis;
+    // Map Phoenix connection UUID to its connection activity logger object
+    protected final Map<UUID, ConnectionActivityLogger> 
openConnectionActivityLoggers;
+
+    private LoggingConnectionLimiter(Builder builder) {
+        super(builder.profileName, builder.shouldThrottleNumConnections, 
builder.maxConnectionsAllowed, builder.maxInternalConnectionsAllowed);
+        this.enableActivityLogging = builder.enableActivityLogging;
+        this.loggingIntervalInMillis = builder.loggingIntervalInMins * 
MIN_IN_MILLIS;
+        this.lastCollectedTimeInMillis = this.lastLoggedTimeInMillis = 
System.currentTimeMillis();
+        this.openConnectionActivityLoggers = Maps.newHashMap();
+    }
+
+    @Override
+    public void acquireConnection(PhoenixConnection connection) throws 
SQLException {
+        super.acquireConnection(connection);
+        if ((this.enableActivityLogging) && 
(this.openConnectionActivityLoggers.size() < this.maxConnectionsAllowed + 
this.maxInternalConnectionsAllowed)) {
+            ConnectionActivityLogger logger = new 
ConnectionActivityLogger(connection, LogLevel.INFO);
+            this.openConnectionActivityLoggers.put(connection.getUniqueID(), 
logger);
+        }
+    }
+
+    @Override
+    public void returnConnection(PhoenixConnection connection) {
+        super.returnConnection(connection);
+        UUID phxConnUniqueID = connection.getUniqueID();
+        Preconditions.checkNotNull(phxConnUniqueID, "Got null UUID for Phoenix 
Connection!");
+        if (this.enableActivityLogging) {
+            this.openConnectionActivityLoggers.remove(phxConnUniqueID);
+        }
+    }
+
+    @Override
+    public int onSweep(boolean internal)  {
+        long currentTimeInMillis = System.currentTimeMillis();
+        boolean shouldCollectNow = (currentTimeInMillis - 
lastCollectedTimeInMillis) >= loggingIntervalInMillis;
+        int garbageCollectedConnections = 0;
+        if (this.enableActivityLogging && shouldCollectNow)  {
+            Iterator<Map.Entry<UUID, ConnectionActivityLogger>> iterator = 
openConnectionActivityLoggers.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<UUID, ConnectionActivityLogger> entry = 
iterator.next();
+                ConnectionActivityLogger logger = entry.getValue();
+                // check for reclaim only for connection/logger that match the 
sweep type. for e.g internal or external client connections.
+                boolean checkReclaimable = ((logger.isInternalConnection() && 
internal) || (!logger.isInternalConnection() && !internal));
+                if (checkReclaimable) {
+                    PhoenixConnection monitoredConnection = 
logger.getConnection();
+                    LOGGER.info(String.format("connection-sweep-activity-log 
for %s: %s", logger.getConnectionID(), logger.getActivityLog()));
+                    if (monitoredConnection == null) {
+                        garbageCollectedConnections += 
collectConnection(internal);
+                        iterator.remove();
+                    }
+                }
+            }
+            LOGGER.info(String.format("connection-profile-metrics-log for %s: 
internal=%s, freed=%d, current=%d, open=%d, throttled=%d",
+                    this.profileName,
+                    internal,
+                    garbageCollectedConnections,
+                    internal ? getInternalConnectionCount(): 
getConnectionCount(),
+                    internal ?
+                            
GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue() :
+                            
GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue(),
+                    
GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().getValue()));
+
+            // Register the last logged time
+            lastCollectedTimeInMillis = currentTimeInMillis;
+
+        }
+        return garbageCollectedConnections;
+    }
+
+    private int collectConnection(boolean internal) {
+        if (internal && internalConnectionCount > 0) {
+            --internalConnectionCount;
+            GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
+            return 1;
+        } else if (!internal && connectionCount > 0) {
+            --connectionCount;
+            GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
+            return 1;
+        }
+        return 0;
+    }
+
+    @VisibleForTesting
+    public Map<String, String> getActivityLog() throws SQLException {
+        Map<String, String> activityLog = Maps.newHashMap();
+        if (this.enableActivityLogging) {
+            for (ConnectionActivityLogger connectionLogger : 
openConnectionActivityLoggers.values()) {
+                activityLog.put(connectionLogger.getConnectionID(), 
connectionLogger.getActivityLog());
+            }
+        }
+        return activityLog;
+    }
+
+    public static class Builder {
+
+        protected String profileName;
+        protected boolean enableActivityLogging;
+        protected int loggingIntervalInMins;
+        protected int maxConnectionsAllowed;
+        protected int maxInternalConnectionsAllowed;
+        protected boolean shouldThrottleNumConnections;
+
+        public Builder(boolean shouldThrottleNumConnections) {
+            this.shouldThrottleNumConnections = shouldThrottleNumConnections;
+        }
+
+        public Builder withConnectionProfile(String profileName) {
+            this.profileName = profileName;
+            return this;
+        }
+
+        public Builder withMaxAllowed(int maxAllowed) {
+            this.maxConnectionsAllowed = maxAllowed;
+            return this;
+        }
+
+        public Builder withMaxInternalAllowed(int maxInternalAllowed) {
+            this.maxInternalConnectionsAllowed = maxInternalAllowed;
+            return this;
+        }
+
+        public Builder withLogging(boolean enabled) {
+            this.enableActivityLogging = enabled;
+            return this;
+        }
+        public Builder withLoggingIntervalInMins(int loggingIntervalInMins) {
+            this.loggingIntervalInMins = loggingIntervalInMins;
+            return this;
+        }
+
+        public ConnectionLimiter build() {
+            return new LoggingConnectionLimiter(this);
+        }
+
+    }
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index dd62ed125e..b89ad3e435 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.log.ConnectionLimiter;
 import org.apache.phoenix.log.QueryLoggerDisruptor;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.PColumn;
@@ -221,4 +222,8 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
     }
 
     PMetaData getMetaDataCache();
+    public default ConnectionLimiter getConnectionLimiter()  {
+        throw new UnsupportedOperationException();
+    }
+
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 551be97e1c..bf161a9499 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -67,7 +67,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
-import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY;
 import static 
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAILURES;
@@ -235,6 +234,9 @@ import 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.log.ConnectionLimiter;
+import org.apache.phoenix.log.DefaultConnectionLimiter;
+import org.apache.phoenix.log.LoggingConnectionLimiter;
 import org.apache.phoenix.log.QueryLoggerDisruptor;
 import org.apache.phoenix.monitoring.TableMetricsManager;
 import org.apache.phoenix.parse.PFunction;
@@ -331,6 +333,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private final GuidePostsCacheProvider
             GUIDE_POSTS_CACHE_PROVIDER = new GuidePostsCacheProvider();
     protected final Configuration config;
+
+    public ConnectionInfo getConnectionInfo() {
+        return connectionInfo;
+    }
+
     protected final ConnectionInfo connectionInfo;
     // Copy of config.getProps(), but read-only to prevent synchronization 
that we
     // don't need.
@@ -394,6 +401,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private ServerSideRPCControllerFactory serverSideRPCControllerFactory;
     private boolean localIndexUpgradeRequired;
 
+    private final boolean enableConnectionActivityLogging;
+    private final int loggingIntervalInMins;
+
+    private final ConnectionLimiter connectionLimiter;
+
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
     }
@@ -487,6 +499,30 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         this.maxInternalConnectionsAllowed = 
config.getInt(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,
                 
QueryServicesOptions.DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS);
         this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0) || 
(maxInternalConnectionsAllowed > 0);
+        this.enableConnectionActivityLogging =
+                config.getBoolean(CONNECTION_ACTIVITY_LOGGING_ENABLED,
+                        
QueryServicesOptions.DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED);
+        this.loggingIntervalInMins =
+                config.getInt(CONNECTION_ACTIVITY_LOGGING_INTERVAL,
+                        
QueryServicesOptions.DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS);
+
+        if (enableConnectionActivityLogging) {
+            LoggingConnectionLimiter.Builder builder = new 
LoggingConnectionLimiter.Builder(shouldThrottleNumConnections);
+            connectionLimiter = builder
+                    .withLoggingIntervalInMins(loggingIntervalInMins)
+                    .withLogging(true)
+                    .withMaxAllowed(this.maxConnectionsAllowed)
+                    .withMaxInternalAllowed(this.maxInternalConnectionsAllowed)
+                    .build();
+        } else {
+            DefaultConnectionLimiter.Builder builder = new 
DefaultConnectionLimiter.Builder(shouldThrottleNumConnections);
+            connectionLimiter = builder
+                    .withMaxAllowed(this.maxConnectionsAllowed)
+                    .withMaxInternalAllowed(this.maxInternalConnectionsAllowed)
+                    .build();
+        }
+
+
         if (!QueryUtil.isServerConnection(props)) {
             //Start queryDistruptor everytime as log level can be change at 
connection level as well, but we can avoid starting for server connections.
             try {
@@ -1917,6 +1953,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    @VisibleForTesting
+    public ConnectionLimiter getConnectionLimiter() {
+        return connectionLimiter;
+    }
     /**
      * helper function to return the exception from the RPC
      * @param controller
@@ -5641,30 +5681,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public void addConnection(PhoenixConnection connection) throws 
SQLException {
         if (returnSequenceValues || shouldThrottleNumConnections) {
             synchronized (connectionCountLock) {
-
-                /*
-                 * If we are throttling connections internal connections and 
client created connections
-                 *   are counted separately against each respective quota.
-                 */
-                if (shouldThrottleNumConnections) {
-                    int futureConnections = 1 + ( 
connection.isInternalConnection() ? internalConnectionCount : connectionCount);
-                    int allowedConnections = connection.isInternalConnection() 
? maxInternalConnectionsAllowed : maxConnectionsAllowed;
-                    if (allowedConnections != 0 && futureConnections > 
allowedConnections) {
-                        
GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
-                        if (connection.isInternalConnection()) {
-                            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED).
-                                    build().buildException();
-                        }
-                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED).
-                                build().buildException();
-                    }
-                }
-
-                if (!connection.isInternalConnection()) {
-                    connectionCount++;
-                } else {
-                    internalConnectionCount++;
-                }
+                connectionLimiter.acquireConnection(connection);
             }
         }
         // If lease renewal isn't enabled, these are never cleaned up. 
Tracking when renewals
@@ -5680,7 +5697,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null;
             synchronized (connectionCountLock) {
                 if (!connection.isInternalConnection()) {
-                    if (connectionCount + internalConnectionCount - 1 <= 0) {
+                    if (connectionLimiter.isLastConnection()) {
                         if (!this.sequenceMap.isEmpty()) {
                             formerSequenceMap = this.sequenceMap;
                             this.sequenceMap = Maps.newConcurrentMap();
@@ -5695,13 +5712,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 returnAllSequences(formerSequenceMap);
             }
         }
-        if (returnSequenceValues || shouldThrottleNumConnections){ //still 
need to decrement connection count
+        if (returnSequenceValues || 
connectionLimiter.isShouldThrottleNumConnections()) { //still need to decrement 
connection count
             synchronized (connectionCountLock) {
-                if (connection.isInternalConnection() && 
internalConnectionCount > 0) {
-                    --internalConnectionCount;
-                } else if (connectionCount > 0) {
-                    --connectionCount;
-                }
+                connectionLimiter.returnConnection(connection);
             }
         }
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 4f86efc8aa..72ee10c215 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -38,6 +38,7 @@ import 
org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.log.ConnectionLimiter;
 import org.apache.phoenix.log.QueryLoggerDisruptor;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
@@ -411,4 +412,8 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     public PMetaData getMetaDataCache() {
         return getDelegate().getMetaDataCache();
     }
+
+    public ConnectionLimiter getConnectionLimiter() {
+        return getDelegate().getConnectionLimiter();
+    }
 }
\ No newline at end of file
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index f980fe041e..a273403195 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -293,6 +293,10 @@ public interface QueryServices extends SQLCloseable {
     //max number of connections from a single client to a single cluster. 0 is 
unlimited.
     public static final String INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS =
             "phoenix.internal.connection.max.allowed.connections";
+    public static final String CONNECTION_ACTIVITY_LOGGING_ENABLED =
+            "phoenix.connection.activity.logging.enabled";
+    public static final String CONNECTION_ACTIVITY_LOGGING_INTERVAL =
+            "phoenix.connection.activity.logging.interval";
     public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB  = 
"phoenix.default.column.encoded.bytes.attrib";
     public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = 
"phoenix.default.immutable.storage.scheme";
     public static final String 
DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = 
"phoenix.default.multitenant.immutable.storage.scheme";
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index adef425d82..81a2a29004 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -28,6 +28,8 @@ import static 
org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
 import static 
org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
+import static 
org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_INTERVAL;
 import static 
org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
@@ -343,6 +345,9 @@ public class QueryServicesOptions {
     public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS 
= 0;
     //by default, max internal connections from one client to one cluster is 
unlimited
     public static final int 
DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
+
+    public static final boolean DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED = 
false;
+    public static final int 
DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS = 15;
     public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true;
     public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true;
     
@@ -497,7 +502,10 @@ public class QueryServicesOptions {
             .setIfUnset(MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN,
                     DEFAULT_MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN)
             .setIfUnset(SERVER_MERGE_FOR_UNCOVERED_INDEX,
-                DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX);
+                DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX)
+            .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, 
DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE)
+            .setIfUnset(CONNECTION_ACTIVITY_LOGGING_ENABLED, 
DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED)
+            .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, 
DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS);
 
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set

Reply via email to