HBASE-18214 Replace the folly::AtomicHashMap usage in the RPC layer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a499d6a8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a499d6a8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a499d6a8 Branch: refs/heads/HBASE-14850 Commit: a499d6a819b2464c5e6500ef0d93808ef6f46d73 Parents: bb10756 Author: Enis Soztutar <e...@apache.org> Authored: Mon Jun 26 14:30:49 2017 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Mon Jun 26 14:30:49 2017 -0700 ---------------------------------------------------------------------- .../connection/client-dispatcher.cc | 16 ++- .../connection/client-dispatcher.h | 6 +- .../connection/client-handler.cc | 17 +-- hbase-native-client/connection/client-handler.h | 5 +- .../connection/connection-factory.cc | 8 +- hbase-native-client/utils/BUCK | 17 ++- .../utils/concurrent-map-test.cc | 36 ++++++ hbase-native-client/utils/concurrent-map.h | 125 +++++++++++++++++++ 8 files changed, 196 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a499d6a8/hbase-native-client/connection/client-dispatcher.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 35a1f7d..b9b2c34 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -25,16 +25,11 @@ using std::unique_ptr; namespace hbase { -ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {} +ClientDispatcher::ClientDispatcher() : current_call_id_(9), requests_(5000) {} void ClientDispatcher::read(Context *ctx, unique_ptr<Response> in) { auto call_id = in->call_id(); - - auto search = requests_.find(call_id); - CHECK(search != requests_.end()); - auto p = std::move(search->second); - - requests_.erase(call_id); + auto p = requests_.find_and_erase(call_id); if (in->exception()) { p.setException(in->exception()); @@ -46,8 +41,11 @@ void ClientDispatcher::read(Context *ctx, unique_ptr<Response> in) { folly::Future<unique_ptr<Response>> ClientDispatcher::operator()(unique_ptr<Request> arg) { auto call_id = current_call_id_++; arg->set_call_id(call_id); - requests_.insert(call_id, folly::Promise<unique_ptr<Response>>{}); - auto &p = requests_.find(call_id)->second; + + // TODO: if the map is full (or we have more than hbase.client.perserver.requests.threshold) + // then throw ServerTooBusyException so that upper layers will retry. + auto &p = requests_[call_id]; + auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { LOG(ERROR) << "e = " << call_id; http://git-wip-us.apache.org/repos/asf/hbase/blob/a499d6a8/hbase-native-client/connection/client-dispatcher.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 857042c..1f8e6b3 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -19,16 +19,18 @@ #pragma once -#include <folly/AtomicHashMap.h> #include <folly/Logging.h> #include <wangle/service/ClientDispatcher.h> #include <atomic> +#include <map> #include <memory> +#include <mutex> #include "connection/pipeline.h" #include "connection/request.h" #include "connection/response.h" +#include "utils/concurrent-map.h" namespace hbase { /** @@ -51,7 +53,7 @@ class ClientDispatcher folly::Future<folly::Unit> close() override; private: - folly::AtomicHashMap<uint32_t, folly::Promise<std::unique_ptr<Response>>> requests_; + concurrent_map<uint32_t, folly::Promise<std::unique_ptr<Response>>> requests_; // Start at some number way above what could // be there for un-initialized call id counters. // http://git-wip-us.apache.org/repos/asf/hbase/blob/a499d6a8/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 775df68..052c171 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -40,8 +40,9 @@ ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec serde_(codec), server_(server), once_flag_(std::make_unique<std::once_flag>()), - resp_msgs_(std::make_unique<folly::AtomicHashMap<uint32_t, std::shared_ptr<Message>>>(5000)) { -} + resp_msgs_( + std::make_unique<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>>( + 5000)) {} void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) { if (LIKELY(buf != nullptr)) { @@ -53,15 +54,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) { VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() << " has_exception=" << header.has_exception() << ", server: " << server_; - // Get the response protobuf from the map - auto search = resp_msgs_->find(header.call_id()); - // It's an error if it's not there. - CHECK(search != resp_msgs_->end()); - auto resp_msg = search->second; - CHECK(resp_msg != nullptr); - - // Make sure we don't leak the protobuf - resp_msgs_->erase(header.call_id()); + auto resp_msg = resp_msgs_->find_and_erase(header.call_id()); // set the call_id. // This will be used to by the dispatcher to match up @@ -133,7 +126,7 @@ folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Re VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_; // Now store the call id to response. - resp_msgs_->insert(r->call_id(), r->resp_msg()); + resp_msgs_->insert(std::make_pair(r->call_id(), r->resp_msg())); // Send the data down the pipeline. return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get())); http://git-wip-us.apache.org/repos/asf/hbase/blob/a499d6a8/hbase-native-client/connection/client-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index 4c106e0..8de3a8b 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -18,7 +18,6 @@ */ #pragma once -#include <folly/AtomicHashMap.h> #include <wangle/channel/Handler.h> #include <atomic> @@ -30,6 +29,7 @@ #include "exceptions/exception.h" #include "serde/codec.h" #include "serde/rpc.h" +#include "utils/concurrent-map.h" // Forward decs. namespace hbase { @@ -81,7 +81,6 @@ class ClientHandler std::string server_; // for logging // in flight requests - std::unique_ptr<folly::AtomicHashMap<uint32_t, std::shared_ptr<google::protobuf::Message>>> - resp_msgs_; + std::unique_ptr<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>> resp_msgs_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/a499d6a8/hbase-native-client/connection/connection-factory.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index d1bfbce..a0c7f96 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -17,15 +17,15 @@ * */ -#include "connection/connection-factory.h" -#include "connection/sasl-handler.h" +#include <glog/logging.h> +#include <wangle/channel/Handler.h> #include <chrono> -#include <glog/logging.h> -#include <wangle/channel/Handler.h> #include "connection/client-dispatcher.h" +#include "connection/connection-factory.h" #include "connection/pipeline.h" +#include "connection/sasl-handler.h" #include "connection/service.h" using std::chrono::milliseconds; http://git-wip-us.apache.org/repos/asf/hbase/blob/a499d6a8/hbase-native-client/utils/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/utils/BUCK index 788056b..96f02b8 100644 --- a/hbase-native-client/utils/BUCK +++ b/hbase-native-client/utils/BUCK @@ -20,6 +20,7 @@ cxx_library( exported_headers=[ "bytes-util.h", "connection-util.h", + "concurrent-map.h", "optional.h", "sys-util.h", "time-util.h", @@ -38,18 +39,26 @@ cxx_library( ], compiler_flags=['-Weffc++'],) cxx_test( - name="user-util-test", + name="bytes-util-test", srcs=[ - "user-util-test.cc", + "bytes-util-test.cc", ], deps=[ ":utils", ],) cxx_test( - name="bytes-util-test", + name="concurrent-map-test", srcs=[ - "bytes-util-test.cc", + "concurrent-map-test.cc", ], deps=[ ":utils", ],) +cxx_test( + name="user-util-test", + srcs=[ + "user-util-test.cc", + ], + deps=[ + ":utils", + ],) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a499d6a8/hbase-native-client/utils/concurrent-map-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/utils/concurrent-map-test.cc b/hbase-native-client/utils/concurrent-map-test.cc new file mode 100644 index 0000000..588bd08 --- /dev/null +++ b/hbase-native-client/utils/concurrent-map-test.cc @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Logging.h> +#include <gtest/gtest.h> +#include <string> + +#include "utils/concurrent-map.h" + +using hbase::concurrent_map; + +TEST(TestConcurrentMap, TestFindAndErase) { + concurrent_map<std::string, std::string> map{500}; + + map.insert(std::make_pair("foo", "bar")); + auto prev = map.find_and_erase("foo"); + ASSERT_EQ("bar", prev); + + ASSERT_EQ(map.end(), map.find("foo")); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a499d6a8/hbase-native-client/utils/concurrent-map.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/utils/concurrent-map.h b/hbase-native-client/utils/concurrent-map.h new file mode 100644 index 0000000..d9703e1 --- /dev/null +++ b/hbase-native-client/utils/concurrent-map.h @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <map> +#include <memory> +#include <shared_mutex> +#include <unordered_map> +#include <utility> + +namespace hbase { + +/** + * A concurrent version of std::unordered_map where we acquire a shared or exclusive + * lock for operations. This is NOT a highly-concurrent and scalable implementation + * since there is only one lock object. + * Replace this with tbb::concurrent_unordered_map or similar. + * + * Concurrency here is different than in Java. For example, the iterators returned from + * find() will not copy the key, value pairs. + */ +template <typename K, typename V> +class concurrent_map { + public: + typedef K key_type; + typedef V mapped_type; + typedef std::pair<const key_type, mapped_type> value_type; + typedef typename std::unordered_map<K, V>::iterator iterator; + typedef typename std::unordered_map<K, V>::const_iterator const_iterator; + + concurrent_map() : map_(), mutex_() {} + explicit concurrent_map(int32_t n) : map_(n), mutex_() {} + + void insert(const value_type& value) { + std::unique_lock<std::shared_timed_mutex> lock(mutex_); + map_.insert(value); + } + + /** + * Return the mapped object for this key. Be careful to not use the return reference + * to do assignment. I think it won't be thread safe + */ + mapped_type& at(const key_type& key) { + std::shared_lock<std::shared_timed_mutex> lock(mutex_); + iterator where = map_.find(key); + if (where == end()) { + std::runtime_error("Key not found"); + } + return where->second; + } + + mapped_type& operator[](const key_type& key) { + std::unique_lock<std::shared_timed_mutex> lock(mutex_); + iterator where = map_.find(key); + if (where == end()) { + return map_[key]; + } + return where->second; + } + + /** + * Atomically finds the entry and removes it from the map, returning + * the previously associated value. + */ + mapped_type find_and_erase(const K& key) { + std::unique_lock<std::shared_timed_mutex> lock(mutex_); + auto search = map_.find(key); + // It's an error if it's not there. + CHECK(search != end()); + auto val = std::move(search->second); + map_.erase(key); + return val; + } + + void erase(const K& key) { + std::unique_lock<std::shared_timed_mutex> lock(mutex_); + map_.erase(key); + } + + iterator begin() { return map_.begin(); } + + const_iterator begin() const { return map_.begin(); } + + const_iterator cbegin() const { return map_.begin(); } + + iterator end() { return map_.end(); } + + const_iterator end() const { return map_.end(); } + + const_iterator cend() const { return map_.end(); } + + iterator find(const K& key) { + std::shared_lock<std::shared_timed_mutex> lock(mutex_); + return map_.find(key); + } + + // TODO: find(), at() returning const_iterator + + bool empty() const { + std::unique_lock<std::shared_timed_mutex> lock(mutex_); + return map_.empty(); + } + + private: + std::shared_timed_mutex mutex_; + std::unordered_map<K, V> map_; +}; +} /* namespace hbase */