HBASE-15770 Stop using wangle's global executor Summary: Connection pool and connection factory now get thread pools through their constructor. This means that the client has the whole control over the threads.
Test Plan: simple-client still writes. Differential Revision: https://reviews.facebook.net/D57801 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d696e3d1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d696e3d1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d696e3d1 Branch: refs/heads/HBASE-14850 Commit: d696e3d114f94584404786c09cb74d69b332da32 Parents: 6f571ee Author: Elliott Clark <ecl...@apache.org> Authored: Fri May 6 14:32:16 2016 -0700 Committer: Elliott Clark <elli...@fb.com> Committed: Tue May 17 15:02:24 2016 -0700 ---------------------------------------------------------------------- .../connection/client-dispatcher.h | 3 ++- .../connection/client-handler.cc | 2 +- .../connection/connection-factory.cc | 8 +++--- .../connection/connection-factory.h | 2 +- .../connection/connection-pool-test.cc | 1 + .../connection/connection-pool.cc | 7 ++--- .../connection/connection-pool.h | 2 +- hbase-native-client/core/client.cc | 23 +++++++++++----- hbase-native-client/core/client.h | 9 +++++-- hbase-native-client/core/location-cache-test.cc | 5 ++-- hbase-native-client/core/location-cache.cc | 28 +++++++++++--------- hbase-native-client/core/location-cache.h | 8 ++++-- hbase-native-client/core/simple-client.cc | 8 +++--- hbase-native-client/serde/server-name-test.cc | 1 - 14 files changed, 63 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/client-dispatcher.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 4bfb35d..2497cc7 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -31,7 +31,8 @@ namespace hbase { /** - * Dispatcher that assigns a call_id and then routes the response back to the future. + * Dispatcher that assigns a call_id and then routes the response back to the + * future. */ class ClientDispatcher : public wangle::ClientDispatcherBase<SerializePipeline, http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index cae03c7..2e3fcd3 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -37,7 +37,7 @@ using hbase::pb::GetResponse; using google::protobuf::Message; ClientHandler::ClientHandler(std::string user_name) - : user_name_(user_name), serde_(), once_flag_(), + : user_name_(user_name), serde_(), once_flag_(std::make_unique<std::once_flag>()), resp_msgs_( make_unique<folly::AtomicHashMap< uint32_t, std::shared_ptr<google::protobuf::Message>>>(5000)) {} http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-factory.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index 635d12d..beec6d5 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -19,8 +19,6 @@ #include "connection/connection-factory.h" -#include <wangle/concurrent/GlobalExecutor.h> - #include "connection/client-dispatcher.h" #include "connection/pipeline.h" #include "connection/service.h" @@ -28,9 +26,9 @@ using namespace folly; using namespace hbase; -ConnectionFactory::ConnectionFactory() - : io_pool_(std::static_pointer_cast<wangle::IOThreadPoolExecutor>( - wangle::getIOExecutor())), +ConnectionFactory::ConnectionFactory( + std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool) + : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>()) {} std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-factory.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index 8b6d8d8..fb5d9fe 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -39,7 +39,7 @@ public: * Constructor. * There should only be one ConnectionFactory per client. */ - ConnectionFactory(); + ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool); /** Default Desctructor */ virtual ~ConnectionFactory() = default; http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-pool-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index b1a0ba0..c0c346f 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -34,6 +34,7 @@ using ::testing::_; class MockConnectionFactory : public ConnectionFactory { public: + MockConnectionFactory() : ConnectionFactory(nullptr) {} MOCK_METHOD0(MakeBootstrap, std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>()); MOCK_METHOD3(Connect, http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-pool.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index 75f343e..90e2056 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -31,9 +31,10 @@ using hbase::HBaseService; using folly::SharedMutexWritePriority; using folly::SocketAddress; -ConnectionPool::ConnectionPool() - : cf_(std::make_shared<ConnectionFactory>()), clients_(), connections_(), - map_mutex_() {} +ConnectionPool::ConnectionPool( + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor) + : cf_(std::make_shared<ConnectionFactory>(io_executor)), clients_(), + connections_(), map_mutex_() {} ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf) : cf_(cf), clients_(), connections_(), map_mutex_() {} http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-pool.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 5edd407..60f00de 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -61,7 +61,7 @@ struct ServerNameHash { class ConnectionPool { public: /** Create connection pool wit default connection factory */ - ConnectionPool(); + ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor); /** * Desctructor. http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 4b9f844..1e80998 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -19,18 +19,27 @@ #include "core/client.h" -#include <folly/Logging.h> -#include <folly/Random.h> -#include <gflags/gflags.h> #include <glog/logging.h> -#include <wangle/concurrent/GlobalExecutor.h> #include <string> -#include "if/ZooKeeper.pb.h" - using namespace folly; using namespace std; using namespace hbase::pb; -namespace hbase {} +namespace hbase { + +Client::Client(std::string zk_quorum) + : cpu_executor_(std::make_shared<wangle::CPUThreadPoolExecutor>(4)), + io_executor_(std::make_shared<wangle::IOThreadPoolExecutor>( + sysconf(_SC_NPROCESSORS_ONLN))), + location_cache_(zk_quorum, cpu_executor_, io_executor_) {} + +// We can't have the threads continue running after everything is done +// that leads to an error. +Client::~Client() { + cpu_executor_->stop(); + io_executor_->stop(); +} + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/client.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index 4a6d23b..4db82c4 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -21,6 +21,8 @@ #include <folly/futures/Future.h> #include <folly/io/IOBuf.h> +#include <wangle/concurrent/CPUThreadPoolExecutor.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> #include <string> @@ -33,19 +35,22 @@ namespace hbase { * Client. * * This is the class that provides access to an HBase cluster. - * It is thread safe and does connection pooling. Current recommendations are to have only one Client per cluster around. + * It is thread safe and does connection pooling. Current recommendations are to + * have only one Client per cluster around. */ class Client { public: - /** * Create a new client. * @param quorum_spec Where to connect to get Zookeeper bootstrap information. */ explicit Client(std::string quorum_spec); + ~Client(); private: LocationCache location_cache_; + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/location-cache-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc index 172799d..8bc6383 100644 --- a/hbase-native-client/core/location-cache-test.cc +++ b/hbase-native-client/core/location-cache-test.cc @@ -18,14 +18,15 @@ */ #include <folly/Memory.h> #include <gtest/gtest.h> -#include <wangle/concurrent/GlobalExecutor.h> #include "location-cache.h" using namespace hbase; TEST(LocationCacheTest, TestGetMetaNodeContents) { // TODO(elliott): need to make a test utility for this. - LocationCache cache{"localhost:2181", wangle::getCPUExecutor()}; + auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4); + auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4); + LocationCache cache{"localhost:2181", cpu, io}; auto f = cache.LocateMeta(); auto result = f.get(); ASSERT_FALSE(f.hasException()); http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 583d305..6c018f9 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -20,7 +20,8 @@ #include <folly/Logging.h> #include <folly/io/IOBuf.h> -#include <wangle/concurrent/GlobalExecutor.h> +#include <wangle/concurrent/CPUThreadPoolExecutor.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> #include "connection/response.h" #include "if/Client.pb.h" @@ -48,10 +49,13 @@ using hbase::pb::RegionInfo; // TODO(eclark): make this configurable on client creation static const char META_ZNODE_NAME[] = "/hbase/meta-region-server"; -LocationCache::LocationCache(string quorum_spec, - shared_ptr<folly::Executor> executor) - : quorum_spec_(quorum_spec), executor_(executor), meta_promise_(nullptr), - meta_lock_(), cp_(), meta_util_(), zk_(nullptr) { +LocationCache::LocationCache( + std::string quorum_spec, + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor) + : quorum_spec_(quorum_spec), cpu_executor_(cpu_executor), + meta_promise_(nullptr), meta_lock_(), cp_(io_executor), meta_util_(), + zk_(nullptr) { zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0); } @@ -79,7 +83,7 @@ void LocationCache::InvalidateMeta() { /// MUST hold the meta_lock_ void LocationCache::RefreshMetaLocation() { meta_promise_ = make_unique<SharedPromise<ServerName>>(); - executor_->add([&] { + cpu_executor_->add([&] { meta_promise_->setWith([&] { return this->ReadMetaLocation(); }); }); } @@ -109,10 +113,9 @@ ServerName LocationCache::ReadMetaLocation() { Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const TableName &tn, const string &row) { - auto exec = wangle::getCPUExecutor(); return this->LocateMeta() - .via(exec.get()) - .then([ exec = exec, this ](ServerName sn) { return this->cp_.get(sn); }) + .via(cpu_executor_.get()) + .then([this](ServerName sn) { return this->cp_.get(sn); }) .then([&](std::shared_ptr<HBaseService> service) { return (*service)(std::move(meta_util_.MetaRequest(tn, row))); }) @@ -121,7 +124,7 @@ LocationCache::LocateFromMeta(const TableName &tn, const string &row) { // a region location. return this->CreateLocation(std::move(resp)); }) - .then([ exec = exec, this ](std::shared_ptr<RegionLocation> rl) { + .then([this](std::shared_ptr<RegionLocation> rl) { // Now fill out the connection. rl->set_service(cp_.get(rl->server_name())); return rl; @@ -129,14 +132,14 @@ LocationCache::LocateFromMeta(const TableName &tn, const string &row) { } /** - * Filter to remove a service from the location cache and the connection cache on errors + * Filter to remove a service from the location cache and the connection cache + * on errors * or on cloase. */ class RemoveServiceFilter : public ServiceFilter<std::unique_ptr<Request>, Response> { public: - /** Create a new filter. */ RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn, ConnectionPool &cp) @@ -157,7 +160,6 @@ public: } } - /** Has this been closed */ virtual bool isAvailable() override { return !released && service_->isAvailable(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/location-cache.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index 88bec18..e077750 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -21,6 +21,8 @@ #include <folly/Executor.h> #include <folly/futures/Future.h> #include <folly/futures/SharedPromise.h> +#include <wangle/concurrent/CPUThreadPoolExecutor.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> #include <zookeeper/zookeeper.h> #include <memory> @@ -51,7 +53,8 @@ public: * @param executor The cpu executor to run on. */ LocationCache(std::string quorum_spec, - std::shared_ptr<folly::Executor> executor); + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_exector, + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor); /** * Destructor. * This will clean up the zookeeper connections. @@ -81,8 +84,9 @@ private: hbase::pb::ServerName ReadMetaLocation(); std::shared_ptr<RegionLocation> CreateLocation(const Response &resp); + /* data */ std::string quorum_spec_; - std::shared_ptr<folly::Executor> executor_; + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_; std::mutex meta_lock_; MetaUtil meta_util_; http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 39c82c3..f3f6c42 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -22,7 +22,7 @@ #include <folly/futures/Future.h> #include <gflags/gflags.h> #include <wangle/concurrent/CPUThreadPoolExecutor.h> -#include <wangle/concurrent/GlobalExecutor.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> #include <atomic> #include <chrono> @@ -89,12 +89,10 @@ int main(int argc, char *argv[]) { // Set up thread pools. auto cpu_pool = std::make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_threads); - wangle::setCPUExecutor(cpu_pool); auto io_pool = std::make_shared<wangle::IOThreadPoolExecutor>(5); - wangle::setIOExecutor(io_pool); // Create the cache. - LocationCache cache{FLAGS_zookeeper, cpu_pool}; + LocationCache cache{FLAGS_zookeeper, cpu_pool, io_pool}; auto row = FLAGS_row; auto tn = folly::to<TableName>(FLAGS_table); @@ -105,7 +103,7 @@ int main(int argc, char *argv[]) { auto num_puts = FLAGS_columns; auto results = std::vector<Future<Response>>{}; - uint64_t col{0}; + auto col = uint64_t{0}; for (; col < num_puts; col++) { results.push_back(folly::makeFuture(col) .via(cpu_pool.get()) http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/serde/server-name-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/server-name-test.cc b/hbase-native-client/serde/server-name-test.cc index 2281fa2..73a68d6 100644 --- a/hbase-native-client/serde/server-name-test.cc +++ b/hbase-native-client/serde/server-name-test.cc @@ -46,5 +46,4 @@ TEST(TestServerName, TestIPV6) { ASSERT_EQ("[::::1]", sn.host_name()); ASSERT_EQ(123, sn.port()); - }