HBASE-15774 Fix Upgrade lock usage in connection pool. Summary: Use upgrade lock better. Fix a clang warning around initializing the UpgradeHolder incorrectly. Remove dead code. ( We'l need to add it back when there's a better plan) Add on more comments.
Test Plan: buck test --all Differential Revision: https://reviews.facebook.net/D58005 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f6ea4937 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f6ea4937 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f6ea4937 Branch: refs/heads/HBASE-14850 Commit: f6ea49374a552bc321e6954fc3e36587cbf4740f Parents: eb4cde4 Author: Elliott Clark <ecl...@apache.org> Authored: Tue May 10 17:44:41 2016 -0700 Committer: Elliott Clark <ecl...@apache.org> Committed: Mon Jul 11 16:47:26 2016 -0700 ---------------------------------------------------------------------- .../connection/connection-pool-test.cc | 12 ++-- .../connection/connection-pool.cc | 47 +++++++++++--- .../connection/connection-pool.h | 6 +- hbase-native-client/core/client.h | 2 +- hbase-native-client/core/location-cache.cc | 68 ++++---------------- 5 files changed, 62 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/connection/connection-pool-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index c0c346f..bd2d585 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -79,9 +79,9 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) { sn.set_host_name(hostname); sn.set_port(port); - auto result = cp.get(sn); + auto result = cp.Get(sn); ASSERT_TRUE(result != nullptr); - result = cp.get(sn); + result = cp.Get(sn); } TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { @@ -102,13 +102,13 @@ TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { ConnectionPool cp{mock_cf}; { - auto result_one = cp.get(folly::to<ServerName>( + auto result_one = cp.Get(folly::to<ServerName>( hostname_one + ":" + folly::to<std::string>(port))); - auto result_two = cp.get(folly::to<ServerName>( + auto result_two = cp.Get(folly::to<ServerName>( hostname_two + ":" + folly::to<std::string>(port))); } - auto result_one = cp.get( + auto result_one = cp.Get( folly::to<ServerName>(hostname_one + ":" + folly::to<std::string>(port))); - auto result_two = cp.get( + auto result_two = cp.Get( folly::to<ServerName>(hostname_two + ":" + folly::to<std::string>(port))); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/connection/connection-pool.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index 90e2056..aa3d094 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -48,25 +48,54 @@ ConnectionPool::~ConnectionPool() { clients_.clear(); } -std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) { - // Create a read lock. - SharedMutexWritePriority::UpgradeHolder holder(map_mutex_); +std::shared_ptr<HBaseService> ConnectionPool::Get(const ServerName &sn) { + // Try and get th cached connection. + auto found_ptr = GetCached(sn); + // If there's no connection then create it. + if (found_ptr == nullptr) { + found_ptr = GetNew(sn); + } + return found_ptr; +} + +std::shared_ptr<HBaseService> ConnectionPool::GetCached(const ServerName &sn) { + SharedMutexWritePriority::ReadHolder holder(map_mutex_); auto found = connections_.find(sn); - if (found == connections_.end() || found->second == nullptr) { - // Move the upgradable lock into the write lock if the connection - // hasn't been found. - SharedMutexWritePriority::WriteHolder holder(std::move(holder)); + if (found == connections_.end()) { + return nullptr; + } + return found->second; +} + +std::shared_ptr<HBaseService> ConnectionPool::GetNew(const ServerName &sn) { + // Grab the upgrade lock. While we are double checking other readers can + // continue on + SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_}; + + // Now check if someone else created the connection before we got the lock + // This is safe since we hold the upgrade lock. + // upgrade lock is more power than the reader lock. + auto found = connections_.find(sn); + if (found != connections_.end() && found->second != nullptr) { + return found->second; + } else { + // Yeah it looks a lot like there's no connection + SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; + + // Make double sure there are not stale connections hanging around. + connections_.erase(sn); + + // Nope we are the ones who should create the new connection. auto client = cf_->MakeBootstrap(); auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port()); clients_.insert(std::make_pair(sn, client)); connections_.insert(std::make_pair(sn, dispatcher)); return dispatcher; } - return found->second; } -void ConnectionPool::close(const ServerName &sn) { +void ConnectionPool::Close(const ServerName &sn) { SharedMutexWritePriority::WriteHolder holder{map_mutex_}; auto found = connections_.find(sn); http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/connection/connection-pool.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 60f00de..605a81b 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -80,14 +80,16 @@ public: * Get a connection to the server name. Start time is ignored. * This can be a blocking operation for a short time. */ - std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn); + std::shared_ptr<HBaseService> Get(const hbase::pb::ServerName &sn); /** * Close/remove a connection. */ - void close(const hbase::pb::ServerName &sn); + void Close(const hbase::pb::ServerName &sn); private: + std::shared_ptr<HBaseService> GetCached(const hbase::pb::ServerName& sn); + std::shared_ptr<HBaseService> GetNew(const hbase::pb::ServerName& sn); std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>, ServerNameHash, ServerNameEquals> connections_; http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/core/client.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 4db82c4..ba24bb9 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -48,9 +48,9 @@ public: ~Client(); private: - LocationCache location_cache_; std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; + LocationCache location_cache_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/f6ea4937/hbase-native-client/core/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 9f2a0ef..6ba8add 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -115,7 +115,7 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const TableName &tn, const string &row) { return this->LocateMeta() .via(cpu_executor_.get()) - .then([this](ServerName sn) { return this->cp_.get(sn); }) + .then([this](ServerName sn) { return this->cp_.Get(sn); }) .then([tn, row, this](std::shared_ptr<HBaseService> service) { return (*service)(std::move(meta_util_.MetaRequest(tn, row))); }) @@ -134,67 +134,25 @@ LocationCache::LocateFromMeta(const TableName &tn, const string &row) { }) .then([this](std::shared_ptr<RegionLocation> rl) { // Now fill out the connection. - rl->set_service(cp_.get(rl->server_name())); + rl->set_service(cp_.Get(rl->server_name())); return rl; }); } -/** - * Filter to remove a service from the location cache and the connection cache - * on errors - * or on cloase. - */ -class RemoveServiceFilter - : public ServiceFilter<std::unique_ptr<Request>, Response> { - -public: - /** Create a new filter. */ - RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn, - ConnectionPool &cp) - : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn), - cp_(cp) {} - - /** - * Close will remove the connection from all caches. - */ - folly::Future<folly::Unit> close() override { - if (!released.exchange(true)) { - return this->service_->close().then([this]() { - // TODO(eclark): remove the service from the meta cache. - this->cp_.close(this->sn_); - }); - } else { - return folly::makeFuture(); - } - } - - /** Has this been closed */ - virtual bool isAvailable() override { - return !released && service_->isAvailable(); - } - - /** Send the message. */ - folly::Future<Response> operator()(unique_ptr<Request> req) override { - // TODO(eclark): add in an on error handler that will - // remove the region location from the cache if needed. - // Also close the connection if this is likely to be an error - // that needs to get a new connection. - return (*this->service_)(std::move(req)); - } - -private: - std::atomic<bool> released{false}; - hbase::pb::ServerName sn_; - ConnectionPool &cp_; -}; - std::shared_ptr<RegionLocation> LocationCache::CreateLocation(const Response &resp) { auto resp_msg = static_pointer_cast<ScanResponse>(resp.resp_msg()); auto &results = resp_msg->results().Get(0); auto &cells = results.cell(); - auto ri = folly::to<RegionInfo>(cells.Get(0).value()); - auto sn = folly::to<ServerName>(cells.Get(1).value()); - return std::make_shared<RegionLocation>(cells.Get(0).row(), std::move(ri), sn, - nullptr); + + // TODO(eclark): There should probably be some better error + // handling around this. + auto cell_zero = cells.Get(0).value(); + auto cell_one = cells.Get(1).value(); + auto row = cells.Get(0).row(); + + auto region_info = folly::to<RegionInfo>(cell_zero); + auto server_name = folly::to<ServerName>(cell_one); + return std::make_shared<RegionLocation>(row, std::move(region_info), + server_name, nullptr); }