Repository: phoenix Updated Branches: refs/heads/encodecolumns2 ecc157b09 -> b49fc0d1d (forced update)
PHOENIX-3563 Ensure we release ZooKeeper resources allocated by the Tephra client embedded in the Phoenix connection Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b69b177b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b69b177b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b69b177b Branch: refs/heads/encodecolumns2 Commit: b69b177b3f5e39d1fa1c3300acfed9290cbe5c52 Parents: 91d1478 Author: Andrew Purtell <apurt...@apache.org> Authored: Wed Jan 4 16:48:44 2017 -0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Sat Jan 7 18:52:59 2017 -0800 ---------------------------------------------------------------------- .../query/ConnectionQueryServicesImpl.java | 37 ++++++++++++-------- 1 file changed, 23 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b69b177b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index f1de0bd..c1688c4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -255,6 +255,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final boolean returnSequenceValues ; private HConnection connection; + private ZKClientService txZKClientService; private TransactionServiceClient txServiceClient; private volatile boolean initialized; private volatile int nSequenceSaltBuckets; @@ -371,15 +372,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); // Create instance of the tephra zookeeper client - ZKClientService tephraZKClientService = new TephraZKClientService(zkQuorumServersString, timeOut, null, ArrayListMultimap.<String, byte[]>create()); - - ZKClientService zkClientService = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure(tephraZKClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) - ) + txZKClientService = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + new TephraZKClientService(zkQuorumServersString, timeOut, null, + ArrayListMultimap.<String, byte[]>create()), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) + ) ); - zkClientService.startAndWait(); - ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); + txZKClientService.startAndWait(); + ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService); PooledClientProvider pooledClientProvider = new PooledClientProvider( config, zkDiscoveryService); this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); @@ -390,11 +392,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement boolean transactionsEnabled = props.getBoolean( QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED); - // only initialize the tx service client if needed + this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); + // only initialize the tx service client if needed and if we succeeded in getting a connection + // to HBase if (transactionsEnabled) { initTxServiceClient(); } - this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); } catch (IOException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) .setRootCause(e).build().buildException(); @@ -464,14 +467,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } finally { try { childServices.clear(); - if (renewLeaseExecutor != null) { - renewLeaseExecutor.shutdownNow(); - } synchronized (latestMetaDataLock) { latestMetaData = null; latestMetaDataLock.notifyAll(); } - if (connection != null) connection.close(); + try { + // close the HBase connection + if (connection != null) connection.close(); + } finally { + if (renewLeaseExecutor != null) { + renewLeaseExecutor.shutdownNow(); + } + // shut down the tx client service if we created one to support transactions + if (this.txZKClientService != null) this.txZKClientService.stopAndWait(); + } } catch (IOException e) { if (sqlE == null) { sqlE = ServerUtil.parseServerException(e);