HBASE-15770 Stop using wangle's global executor

Summary: Connection pool and connection factory now get thread pools through 
their constructor. This means that the client has the whole control over the 
threads.

Test Plan: simple-client still writes.

Differential Revision: https://reviews.facebook.net/D57801


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d696e3d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d696e3d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d696e3d1

Branch: refs/heads/HBASE-14850
Commit: d696e3d114f94584404786c09cb74d69b332da32
Parents: 6f571ee
Author: Elliott Clark <ecl...@apache.org>
Authored: Fri May 6 14:32:16 2016 -0700
Committer: Elliott Clark <elli...@fb.com>
Committed: Tue May 17 15:02:24 2016 -0700

----------------------------------------------------------------------
 .../connection/client-dispatcher.h              |  3 ++-
 .../connection/client-handler.cc                |  2 +-
 .../connection/connection-factory.cc            |  8 +++---
 .../connection/connection-factory.h             |  2 +-
 .../connection/connection-pool-test.cc          |  1 +
 .../connection/connection-pool.cc               |  7 ++---
 .../connection/connection-pool.h                |  2 +-
 hbase-native-client/core/client.cc              | 23 +++++++++++-----
 hbase-native-client/core/client.h               |  9 +++++--
 hbase-native-client/core/location-cache-test.cc |  5 ++--
 hbase-native-client/core/location-cache.cc      | 28 +++++++++++---------
 hbase-native-client/core/location-cache.h       |  8 ++++--
 hbase-native-client/core/simple-client.cc       |  8 +++---
 hbase-native-client/serde/server-name-test.cc   |  1 -
 14 files changed, 63 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/client-dispatcher.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-dispatcher.h 
b/hbase-native-client/connection/client-dispatcher.h
index 4bfb35d..2497cc7 100644
--- a/hbase-native-client/connection/client-dispatcher.h
+++ b/hbase-native-client/connection/client-dispatcher.h
@@ -31,7 +31,8 @@
 
 namespace hbase {
 /**
- * Dispatcher that assigns a call_id and then routes the response back to the 
future.
+ * Dispatcher that assigns a call_id and then routes the response back to the
+ * future.
  */
 class ClientDispatcher
     : public wangle::ClientDispatcherBase<SerializePipeline,

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc 
b/hbase-native-client/connection/client-handler.cc
index cae03c7..2e3fcd3 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -37,7 +37,7 @@ using hbase::pb::GetResponse;
 using google::protobuf::Message;
 
 ClientHandler::ClientHandler(std::string user_name)
-    : user_name_(user_name), serde_(), once_flag_(),
+    : user_name_(user_name), serde_(), 
once_flag_(std::make_unique<std::once_flag>()),
       resp_msgs_(
           make_unique<folly::AtomicHashMap<
               uint32_t, std::shared_ptr<google::protobuf::Message>>>(5000)) {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.cc 
b/hbase-native-client/connection/connection-factory.cc
index 635d12d..beec6d5 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -19,8 +19,6 @@
 
 #include "connection/connection-factory.h"
 
-#include <wangle/concurrent/GlobalExecutor.h>
-
 #include "connection/client-dispatcher.h"
 #include "connection/pipeline.h"
 #include "connection/service.h"
@@ -28,9 +26,9 @@
 using namespace folly;
 using namespace hbase;
 
-ConnectionFactory::ConnectionFactory()
-    : io_pool_(std::static_pointer_cast<wangle::IOThreadPoolExecutor>(
-          wangle::getIOExecutor())),
+ConnectionFactory::ConnectionFactory(
+    std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool)
+    : io_pool_(io_pool),
       pipeline_factory_(std::make_shared<RpcPipelineFactory>()) {}
 
 std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-factory.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-factory.h 
b/hbase-native-client/connection/connection-factory.h
index 8b6d8d8..fb5d9fe 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -39,7 +39,7 @@ public:
    * Constructor.
    * There should only be one ConnectionFactory per client.
    */
-  ConnectionFactory();
+  ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool);
   /** Default Desctructor */
   virtual ~ConnectionFactory() = default;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool-test.cc 
b/hbase-native-client/connection/connection-pool-test.cc
index b1a0ba0..c0c346f 100644
--- a/hbase-native-client/connection/connection-pool-test.cc
+++ b/hbase-native-client/connection/connection-pool-test.cc
@@ -34,6 +34,7 @@ using ::testing::_;
 
 class MockConnectionFactory : public ConnectionFactory {
 public:
+  MockConnectionFactory() : ConnectionFactory(nullptr) {}
   MOCK_METHOD0(MakeBootstrap,
                std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>());
   MOCK_METHOD3(Connect,

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.cc 
b/hbase-native-client/connection/connection-pool.cc
index 75f343e..90e2056 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -31,9 +31,10 @@ using hbase::HBaseService;
 using folly::SharedMutexWritePriority;
 using folly::SocketAddress;
 
-ConnectionPool::ConnectionPool()
-    : cf_(std::make_shared<ConnectionFactory>()), clients_(), connections_(),
-      map_mutex_() {}
+ConnectionPool::ConnectionPool(
+    std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
+    : cf_(std::make_shared<ConnectionFactory>(io_executor)), clients_(),
+      connections_(), map_mutex_() {}
 ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf)
     : cf_(cf), clients_(), connections_(), map_mutex_() {}
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/connection/connection-pool.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/connection-pool.h 
b/hbase-native-client/connection/connection-pool.h
index 5edd407..60f00de 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -61,7 +61,7 @@ struct ServerNameHash {
 class ConnectionPool {
 public:
   /** Create connection pool wit default connection factory */
-  ConnectionPool();
+  ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
 
   /**
    * Desctructor.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.cc 
b/hbase-native-client/core/client.cc
index 4b9f844..1e80998 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -19,18 +19,27 @@
 
 #include "core/client.h"
 
-#include <folly/Logging.h>
-#include <folly/Random.h>
-#include <gflags/gflags.h>
 #include <glog/logging.h>
-#include <wangle/concurrent/GlobalExecutor.h>
 
 #include <string>
 
-#include "if/ZooKeeper.pb.h"
-
 using namespace folly;
 using namespace std;
 using namespace hbase::pb;
 
-namespace hbase {}
+namespace hbase {
+
+Client::Client(std::string zk_quorum)
+    : cpu_executor_(std::make_shared<wangle::CPUThreadPoolExecutor>(4)),
+      io_executor_(std::make_shared<wangle::IOThreadPoolExecutor>(
+          sysconf(_SC_NPROCESSORS_ONLN))),
+      location_cache_(zk_quorum, cpu_executor_, io_executor_) {}
+
+// We can't have the threads continue running after everything is done
+// that leads to an error.
+Client::~Client() {
+  cpu_executor_->stop();
+  io_executor_->stop();
+}
+
+} // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.h 
b/hbase-native-client/core/client.h
index 4a6d23b..4db82c4 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -21,6 +21,8 @@
 
 #include <folly/futures/Future.h>
 #include <folly/io/IOBuf.h>
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
 
 #include <string>
 
@@ -33,19 +35,22 @@ namespace hbase {
  * Client.
  *
  * This is the class that provides access to an HBase cluster.
- * It is thread safe and does connection pooling. Current recommendations are 
to have only one Client per cluster around.
+ * It is thread safe and does connection pooling. Current recommendations are 
to
+ * have only one Client per cluster around.
  */
 class Client {
 public:
-
   /**
    * Create a new client.
    * @param quorum_spec Where to connect to get Zookeeper bootstrap 
information.
    */
   explicit Client(std::string quorum_spec);
+  ~Client();
 
 private:
   LocationCache location_cache_;
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
 };
 
 } // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/location-cache-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache-test.cc 
b/hbase-native-client/core/location-cache-test.cc
index 172799d..8bc6383 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -18,14 +18,15 @@
  */
 #include <folly/Memory.h>
 #include <gtest/gtest.h>
-#include <wangle/concurrent/GlobalExecutor.h>
 
 #include "location-cache.h"
 using namespace hbase;
 
 TEST(LocationCacheTest, TestGetMetaNodeContents) {
   // TODO(elliott): need to make a test utility for this.
-  LocationCache cache{"localhost:2181", wangle::getCPUExecutor()};
+  auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+  auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
+  LocationCache cache{"localhost:2181", cpu, io};
   auto f = cache.LocateMeta();
   auto result = f.get();
   ASSERT_FALSE(f.hasException());

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc 
b/hbase-native-client/core/location-cache.cc
index 583d305..6c018f9 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -20,7 +20,8 @@
 
 #include <folly/Logging.h>
 #include <folly/io/IOBuf.h>
-#include <wangle/concurrent/GlobalExecutor.h>
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
 
 #include "connection/response.h"
 #include "if/Client.pb.h"
@@ -48,10 +49,13 @@ using hbase::pb::RegionInfo;
 // TODO(eclark): make this configurable on client creation
 static const char META_ZNODE_NAME[] = "/hbase/meta-region-server";
 
-LocationCache::LocationCache(string quorum_spec,
-                             shared_ptr<folly::Executor> executor)
-    : quorum_spec_(quorum_spec), executor_(executor), meta_promise_(nullptr),
-      meta_lock_(), cp_(), meta_util_(), zk_(nullptr) {
+LocationCache::LocationCache(
+    std::string quorum_spec,
+    std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+    std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
+    : quorum_spec_(quorum_spec), cpu_executor_(cpu_executor),
+      meta_promise_(nullptr), meta_lock_(), cp_(io_executor), meta_util_(),
+      zk_(nullptr) {
   zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0);
 }
 
@@ -79,7 +83,7 @@ void LocationCache::InvalidateMeta() {
 /// MUST hold the meta_lock_
 void LocationCache::RefreshMetaLocation() {
   meta_promise_ = make_unique<SharedPromise<ServerName>>();
-  executor_->add([&] {
+  cpu_executor_->add([&] {
     meta_promise_->setWith([&] { return this->ReadMetaLocation(); });
   });
 }
@@ -109,10 +113,9 @@ ServerName LocationCache::ReadMetaLocation() {
 
 Future<std::shared_ptr<RegionLocation>>
 LocationCache::LocateFromMeta(const TableName &tn, const string &row) {
-  auto exec = wangle::getCPUExecutor();
   return this->LocateMeta()
-      .via(exec.get())
-      .then([ exec = exec, this ](ServerName sn) { return this->cp_.get(sn); })
+      .via(cpu_executor_.get())
+      .then([this](ServerName sn) { return this->cp_.get(sn); })
       .then([&](std::shared_ptr<HBaseService> service) {
         return (*service)(std::move(meta_util_.MetaRequest(tn, row)));
       })
@@ -121,7 +124,7 @@ LocationCache::LocateFromMeta(const TableName &tn, const 
string &row) {
         // a region location.
         return this->CreateLocation(std::move(resp));
       })
-      .then([ exec = exec, this ](std::shared_ptr<RegionLocation> rl) {
+      .then([this](std::shared_ptr<RegionLocation> rl) {
         // Now fill out the connection.
         rl->set_service(cp_.get(rl->server_name()));
         return rl;
@@ -129,14 +132,14 @@ LocationCache::LocateFromMeta(const TableName &tn, const 
string &row) {
 }
 
 /**
- * Filter to remove a service from the location cache and the connection cache 
on errors
+ * Filter to remove a service from the location cache and the connection cache
+ * on errors
  * or on cloase.
  */
 class RemoveServiceFilter
     : public ServiceFilter<std::unique_ptr<Request>, Response> {
 
 public:
-
   /** Create a new filter. */
   RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn,
                       ConnectionPool &cp)
@@ -157,7 +160,6 @@ public:
     }
   }
 
-
   /** Has this been closed */
   virtual bool isAvailable() override {
     return !released && service_->isAvailable();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/location-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.h 
b/hbase-native-client/core/location-cache.h
index 88bec18..e077750 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -21,6 +21,8 @@
 #include <folly/Executor.h>
 #include <folly/futures/Future.h>
 #include <folly/futures/SharedPromise.h>
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
 #include <zookeeper/zookeeper.h>
 
 #include <memory>
@@ -51,7 +53,8 @@ public:
    * @param executor The cpu executor to run on.
    */
   LocationCache(std::string quorum_spec,
-                std::shared_ptr<folly::Executor> executor);
+                std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_exector,
+                std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
   /**
    * Destructor.
    * This will clean up the zookeeper connections.
@@ -81,8 +84,9 @@ private:
   hbase::pb::ServerName ReadMetaLocation();
   std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
 
+  /* data */
   std::string quorum_spec_;
-  std::shared_ptr<folly::Executor> executor_;
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
   std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_;
   std::mutex meta_lock_;
   MetaUtil meta_util_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc 
b/hbase-native-client/core/simple-client.cc
index 39c82c3..f3f6c42 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -22,7 +22,7 @@
 #include <folly/futures/Future.h>
 #include <gflags/gflags.h>
 #include <wangle/concurrent/CPUThreadPoolExecutor.h>
-#include <wangle/concurrent/GlobalExecutor.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
 
 #include <atomic>
 #include <chrono>
@@ -89,12 +89,10 @@ int main(int argc, char *argv[]) {
   // Set up thread pools.
   auto cpu_pool =
       std::make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_threads);
-  wangle::setCPUExecutor(cpu_pool);
   auto io_pool = std::make_shared<wangle::IOThreadPoolExecutor>(5);
-  wangle::setIOExecutor(io_pool);
 
   // Create the cache.
-  LocationCache cache{FLAGS_zookeeper, cpu_pool};
+  LocationCache cache{FLAGS_zookeeper, cpu_pool, io_pool};
 
   auto row = FLAGS_row;
   auto tn = folly::to<TableName>(FLAGS_table);
@@ -105,7 +103,7 @@ int main(int argc, char *argv[]) {
   auto num_puts = FLAGS_columns;
 
   auto results = std::vector<Future<Response>>{};
-  uint64_t col{0};
+  auto col = uint64_t{0};
   for (; col < num_puts; col++) {
     results.push_back(folly::makeFuture(col)
                           .via(cpu_pool.get())

http://git-wip-us.apache.org/repos/asf/hbase/blob/d696e3d1/hbase-native-client/serde/server-name-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/server-name-test.cc 
b/hbase-native-client/serde/server-name-test.cc
index 2281fa2..73a68d6 100644
--- a/hbase-native-client/serde/server-name-test.cc
+++ b/hbase-native-client/serde/server-name-test.cc
@@ -46,5 +46,4 @@ TEST(TestServerName, TestIPV6) {
 
   ASSERT_EQ("[::::1]", sn.host_name());
   ASSERT_EQ(123, sn.port());
-
 }

Reply via email to