PHOENIX-4809 Only cache PhoenixConnections when lease renewal is on

Lease renewal is the only mechanism under which connections are removed
from the connectionQueue. Calling close() on a connection doesn't proactively
remove it from the instance of ConnectionQueryServicesImpl.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7f1c1fab
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7f1c1fab
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7f1c1fab

Branch: refs/heads/4.14-cdh5.14
Commit: 7f1c1fab89e7004fec98121b1cfea1f552ba2718
Parents: 07d4138
Author: Josh Elser <els...@apache.org>
Authored: Wed Jul 11 22:02:46 2018 +0100
Committer: Pedro Boado <pbo...@apache.org>
Committed: Wed Oct 17 20:42:14 2018 +0100

----------------------------------------------------------------------
 .../phoenix/query/ConnectionCachingIT.java      | 87 ++++++++++++++++++++
 .../query/ConnectionQueryServicesImpl.java      | 11 ++-
 2 files changed, 97 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f1c1fab/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionCachingIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionCachingIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionCachingIT.java
new file mode 100644
index 0000000..b2ef052
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionCachingIT.java
@@ -0,0 +1,87 @@
+package org.apache.phoenix.query;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.ref.WeakReference;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.DelegateConnectionQueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class ConnectionCachingIT extends ParallelStatsEnabledIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionCachingIT.class);
+
+  @Parameters(name= "phoenix.scanner.lease.renew.enabled={0}")
+  public static Iterable<String> data() {
+    return Arrays.asList("true", "false");
+  }
+
+  private String leaseRenewal;
+
+  public ConnectionCachingIT(String leaseRenewalValue) {
+    this.leaseRenewal = leaseRenewalValue;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    props.put("phoenix.scanner.lease.renew.enabled", leaseRenewal);
+
+    // The test driver works correctly, the real one doesn't.
+    String url = getUrl();
+    url = url.replace(";" + PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM, "");
+    LOG.info("URL to use is: {}", url);
+
+    Connection conn = DriverManager.getConnection(url, props);
+    long before = getNumCachedConnections(conn);
+    for (int i = 0; i < 10_000; i++) {
+      Connection c = DriverManager.getConnection(url, props);
+      c.close();
+    }
+    
Thread.sleep(QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS
 / 2);
+    long after = getNumCachedConnections(conn);
+    for (int i = 0; i < 6; i++) {
+      LOG.info("Found {} connections cached", after);
+      if (after <= before) {
+        break;
+      }
+      
Thread.sleep(QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS
 / 2);
+      after = getNumCachedConnections(conn);
+    }
+    assertTrue("Saw " + before + " connections, but ended with " + after, 
after <= before);
+  }
+
+  long getNumCachedConnections(Connection conn) throws Exception {
+    PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+    ConnectionQueryServices cqs = pConn.getQueryServices();
+    // For whatever reason, we sometimes get a delegate here, and sometimes 
the real thing.
+    if (cqs instanceof DelegateConnectionQueryServices) {
+      cqs = ((DelegateConnectionQueryServices) cqs).getDelegate();
+    }
+    assertTrue("ConnectionQueryServices was a " + cqs.getClass(), cqs 
instanceof ConnectionQueryServicesImpl);
+    ConnectionQueryServicesImpl cqsi = (ConnectionQueryServicesImpl) cqs;
+    long cachedConnections = 0L;
+    for (LinkedBlockingQueue<WeakReference<PhoenixConnection>> queue : 
cqsi.getCachedConnections()) {
+      cachedConnections += queue.size();
+    }
+    return cachedConnections;
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f1c1fab/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f8ca7a4..852cad5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4165,7 +4165,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 connectionCount++;
             }
         }
-        connectionQueues.get(getQueueIndex(connection)).add(new 
WeakReference<PhoenixConnection>(connection));
+        // If lease renewal isn't enabled, these are never cleaned up. 
Tracking when renewals
+        // aren't enabled also (presently) has no purpose.
+        if (isRenewingLeasesEnabled()) {
+          connectionQueues.get(getQueueIndex(connection)).add(new 
WeakReference<PhoenixConnection>(connection));
+        }
     }
 
     @Override
@@ -4664,4 +4668,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         return client;
     }
+
+    @VisibleForTesting
+    public List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> 
getCachedConnections() {
+      return connectionQueues;
+    }
 }

Reply via email to