PHOENIX-4809 Only cache PhoenixConnections when lease renewal is on Lease renewal is the only mechanism under which connections are removed from the connectionQueue. Calling close() on a connection doesn't proactively remove it from the instance of ConnectionQueryServicesImpl.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d7d67757 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d7d67757 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d7d67757 Branch: refs/heads/4.x-cdh5.14 Commit: d7d67757d626759fd65efd75da3a1498a1be4f3c Parents: 205fe11 Author: Josh Elser <els...@apache.org> Authored: Wed Jul 11 22:02:46 2018 +0100 Committer: Pedro Boado <pbo...@apache.org> Committed: Sun Jul 29 10:04:05 2018 +0100 ---------------------------------------------------------------------- .../phoenix/query/ConnectionCachingIT.java | 87 ++++++++++++++++++++ .../query/ConnectionQueryServicesImpl.java | 11 ++- 2 files changed, 97 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7d67757/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionCachingIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionCachingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionCachingIT.java new file mode 100644 index 0000000..b2ef052 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionCachingIT.java @@ -0,0 +1,87 @@ +package org.apache.phoenix.query; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertTrue; + +import java.lang.ref.WeakReference; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.phoenix.end2end.ParallelStatsEnabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.ConnectionQueryServicesImpl; +import org.apache.phoenix.query.DelegateConnectionQueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class ConnectionCachingIT extends ParallelStatsEnabledIT { + private static final Logger LOG = LoggerFactory.getLogger(ConnectionCachingIT.class); + + @Parameters(name= "phoenix.scanner.lease.renew.enabled={0}") + public static Iterable<String> data() { + return Arrays.asList("true", "false"); + } + + private String leaseRenewal; + + public ConnectionCachingIT(String leaseRenewalValue) { + this.leaseRenewal = leaseRenewalValue; + } + + @Test + public void test() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.put("phoenix.scanner.lease.renew.enabled", leaseRenewal); + + // The test driver works correctly, the real one doesn't. + String url = getUrl(); + url = url.replace(";" + PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM, ""); + LOG.info("URL to use is: {}", url); + + Connection conn = DriverManager.getConnection(url, props); + long before = getNumCachedConnections(conn); + for (int i = 0; i < 10_000; i++) { + Connection c = DriverManager.getConnection(url, props); + c.close(); + } + Thread.sleep(QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS / 2); + long after = getNumCachedConnections(conn); + for (int i = 0; i < 6; i++) { + LOG.info("Found {} connections cached", after); + if (after <= before) { + break; + } + Thread.sleep(QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS / 2); + after = getNumCachedConnections(conn); + } + assertTrue("Saw " + before + " connections, but ended with " + after, after <= before); + } + + long getNumCachedConnections(Connection conn) throws Exception { + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + ConnectionQueryServices cqs = pConn.getQueryServices(); + // For whatever reason, we sometimes get a delegate here, and sometimes the real thing. + if (cqs instanceof DelegateConnectionQueryServices) { + cqs = ((DelegateConnectionQueryServices) cqs).getDelegate(); + } + assertTrue("ConnectionQueryServices was a " + cqs.getClass(), cqs instanceof ConnectionQueryServicesImpl); + ConnectionQueryServicesImpl cqsi = (ConnectionQueryServicesImpl) cqs; + long cachedConnections = 0L; + for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue : cqsi.getCachedConnections()) { + cachedConnections += queue.size(); + } + return cachedConnections; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7d67757/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index ca9ad5e..ff2891d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -4194,7 +4194,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement connectionCount++; } } - connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection)); + // If lease renewal isn't enabled, these are never cleaned up. Tracking when renewals + // aren't enabled also (presently) has no purpose. + if (isRenewingLeasesEnabled()) { + connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection)); + } } @Override @@ -4693,4 +4697,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return client; } + + @VisibleForTesting + public List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> getCachedConnections() { + return connectionQueues; + } }