http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/append.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/append.h b/hbase-native-client/core/append.h deleted file mode 100644 index cf9ac24..0000000 --- a/hbase-native-client/core/append.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <cstdint> -#include <map> -#include <memory> -#include <string> -#include <vector> -#include "core/cell.h" -#include "core/mutation.h" - -namespace hbase { - -class Append : public Mutation { - public: - /** - * Constructors - */ - explicit Append(const std::string& row) : Mutation(row) {} - Append(const Append& cappend) : Mutation(cappend) {} - Append& operator=(const Append& cappend) { - Mutation::operator=(cappend); - return *this; - } - - ~Append() = default; - - /** - * @brief Add the specified column and value to this Append operation. - * @param family family name - * @param qualifier column qualifier - * @param value value to append - */ - Append& Add(const std::string& family, const std::string& qualifier, const std::string& value); - Append& Add(std::unique_ptr<Cell> cell); -}; - -} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-caller.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc deleted file mode 100644 index dfbf7e7..0000000 --- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc +++ /dev/null @@ -1,488 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/async-batch-rpc-retrying-caller.h" -#include <glog/logging.h> -#include <limits> - -using folly::Future; -using folly::Promise; -using folly::Try; -using hbase::pb::ServerName; -using hbase::pb::TableName; -using hbase::security::User; -using std::chrono::nanoseconds; -using std::chrono::milliseconds; - -namespace hbase { - -template <typename REQ, typename RESP> -AsyncBatchRpcRetryingCaller<REQ, RESP>::AsyncBatchRpcRetryingCaller( - std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<TableName> table_name, const std::vector<REQ> &actions, nanoseconds pause_ns, - int32_t max_attempts, nanoseconds operation_timeout_ns, nanoseconds rpc_timeout_ns, - int32_t start_log_errors_count) - : conn_(conn), - retry_timer_(retry_timer), - table_name_(table_name), - pause_ns_(pause_ns), - operation_timeout_ns_(operation_timeout_ns), - rpc_timeout_ns_(rpc_timeout_ns), - start_log_errors_count_(start_log_errors_count) { - CHECK(conn_ != nullptr); - CHECK(retry_timer_ != nullptr); - location_cache_ = conn_->region_locator(); - rpc_client_ = conn_->rpc_client(); - cpu_pool_ = conn_->cpu_executor(); - CHECK(location_cache_ != nullptr); - CHECK(rpc_client_ != nullptr); - CHECK(cpu_pool_ != nullptr); - - max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts); - uint32_t index = 0; - for (auto row : actions) { - actions_.push_back(std::make_shared<Action>(row, index)); - Promise<RESP> prom{}; - action2promises_.insert(std::pair<uint64_t, Promise<RESP>>(index, std::move(prom))); - action2futures_.push_back(action2promises_[index++].getFuture()); - } -} - -template <typename REQ, typename RESP> -AsyncBatchRpcRetryingCaller<REQ, RESP>::~AsyncBatchRpcRetryingCaller() {} - -template <typename REQ, typename RESP> -Future<std::vector<Try<RESP>>> AsyncBatchRpcRetryingCaller<REQ, RESP>::Call() { - GroupAndSend(actions_, 1); - return collectAll(action2futures_); -} - -template <typename REQ, typename RESP> -int64_t AsyncBatchRpcRetryingCaller<REQ, RESP>::RemainingTimeNs() { - return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_); -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException( - int32_t tries, std::shared_ptr<RegionRequest> region_request, - const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { - if (tries > start_log_errors_count_) { - std::string regions; - regions += region_request->region_location()->region_name() + ", "; - LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" - << table_name_->qualifier() << " from " << server_name->host_name() - << " failed, tries=" << tries << ":- " << ew.what().toStdString(); - } -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException( - int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests, - const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { - if (tries > start_log_errors_count_) { - std::string regions; - for (const auto region_request : region_requests) { - regions += region_request->region_location()->region_name() + ", "; - } - LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" - << table_name_->qualifier() << " from " << server_name->host_name() - << " failed, tries=" << tries << ew.what().toStdString(); - } -} - -template <typename REQ, typename RESP> -const std::string AsyncBatchRpcRetryingCaller<REQ, RESP>::GetExtraContextForError( - std::shared_ptr<ServerName> server_name) { - return server_name ? server_name->ShortDebugString() : ""; -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(const std::shared_ptr<Action> &action, - const folly::exception_wrapper &ew, - std::shared_ptr<ServerName> server_name) { - ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); - AddAction2Error(action->original_index(), twec); -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError( - const std::vector<std::shared_ptr<Action>> &actions, const folly::exception_wrapper &ew, - std::shared_ptr<ServerName> server_name) { - for (const auto action : actions) { - AddError(action, ew, server_name); - } -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailOne(const std::shared_ptr<Action> &action, - int32_t tries, - const folly::exception_wrapper &ew, - int64_t current_time, - const std::string extras) { - auto action_index = action->original_index(); - auto itr = action2promises_.find(action_index); - if (itr != action2promises_.end()) { - if (itr->second.isFulfilled()) { - return; - } - } - ThrowableWithExtraContext twec(ew, current_time, extras); - AddAction2Error(action_index, twec); - action2promises_[action_index].setException( - RetriesExhaustedException(tries - 1, action2errors_[action_index])); -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll( - const std::vector<std::shared_ptr<Action>> &actions, int32_t tries, - const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { - for (const auto action : actions) { - FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); - } -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll( - const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) { - for (const auto action : actions) { - auto action_index = action->original_index(); - auto itr = action2promises_.find(action_index); - if (itr->second.isFulfilled()) { - return; - } - action2promises_[action_index].setException( - RetriesExhaustedException(tries, action2errors_[action_index])); - } -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddAction2Error( - uint64_t action_index, const ThrowableWithExtraContext &twec) { - auto erritr = action2errors_.find(action_index); - if (erritr != action2errors_.end()) { - erritr->second->push_back(twec); - } else { - action2errors_[action_index] = std::make_shared<std::vector<ThrowableWithExtraContext>>(); - action2errors_[action_index]->push_back(twec); - } - return; -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnError(const ActionsByRegion &actions_by_region, - int32_t tries, - const folly::exception_wrapper &ew, - std::shared_ptr<ServerName> server_name) { - std::vector<std::shared_ptr<Action>> copied_actions; - std::vector<std::shared_ptr<RegionRequest>> region_requests; - for (const auto &action_by_region : actions_by_region) { - region_requests.push_back(action_by_region.second); - for (const auto &action : action_by_region.second->actions()) { - copied_actions.push_back(action); - } - } - - LogException(tries, region_requests, ew, server_name); - if ((tries >= max_attempts_) || !ExceptionUtil::ShouldRetry(ew)) { - FailAll(copied_actions, tries, ew, server_name); - return; - } - AddError(copied_actions, ew, server_name); - TryResubmit(copied_actions, tries); -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::TryResubmit( - const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) { - int64_t delay_ns; - if (operation_timeout_ns_.count() > 0) { - int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; - if (max_delay_ns <= 0) { - FailAll(actions, tries); - return; - } - delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1)); - } else { - delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1); - } - - conn_->retry_executor()->add([=]() { - retry_timer_->scheduleTimeoutFn( - [=]() { conn_->cpu_executor()->add([=]() { GroupAndSend(actions, tries + 1); }); }, - milliseconds(TimeUtil::ToMillis(delay_ns))); - }); -} - -template <typename REQ, typename RESP> -Future<std::vector<Try<std::shared_ptr<RegionLocation>>>> -AsyncBatchRpcRetryingCaller<REQ, RESP>::GetRegionLocations( - const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns) { - auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{}; - for (auto const &action : actions) { - locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(), - RegionLocateType::kCurrent, locate_timeout_ns)); - } - - return collectAll(locs); -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend( - const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) { - int64_t locate_timeout_ns; - if (operation_timeout_ns_.count() > 0) { - locate_timeout_ns = RemainingTimeNs(); - if (locate_timeout_ns <= 0) { - FailAll(actions, tries); - return; - } - } else { - locate_timeout_ns = -1L; - } - - GetRegionLocations(actions, locate_timeout_ns) - .then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) { - std::lock_guard<std::recursive_mutex> lck(multi_mutex_); - ActionsByServer actions_by_server; - std::vector<std::shared_ptr<Action>> locate_failed; - - for (uint64_t i = 0; i < loc.size(); ++i) { - auto action = actions[i]; - if (loc[i].hasValue()) { - auto region_loc = loc[i].value(); - // Add it to actions_by_server; - auto search = - actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name())); - if (search != actions_by_server.end()) { - search->second->AddActionsByRegion(region_loc, action); - } else { - auto server_request = std::make_shared<ServerRequest>(region_loc); - server_request->AddActionsByRegion(region_loc, action); - auto server_name = std::make_shared<ServerName>(region_loc->server_name()); - actions_by_server[server_name] = server_request; - } - VLOG(5) << "rowkey [" << action->action()->row() << "] of table[" - << table_name_->ShortDebugString() << "] found in [" - << region_loc->region_name() << "]; RS[" - << region_loc->server_name().host_name() << ":" - << region_loc->server_name().port() << "];"; - } else if (loc[i].hasException()) { - folly::exception_wrapper ew = loc[i].exception(); - VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString() - << "for index:" << i << "; tries: " << tries - << "; max_attempts_: " << max_attempts_; - // We might receive runtime error from location-cache.cc too, we are doing FailOne and - // continue next one - if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) { - FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString()); - } else { - AddError(action, loc[i].exception(), nullptr); - locate_failed.push_back(action); - } - } - } - if (!actions_by_server.empty()) { - Send(actions_by_server, tries); - } - - if (!locate_failed.empty()) { - TryResubmit(locate_failed, tries); - } - }) - .onError([=](const folly::exception_wrapper &ew) { - VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString() - << "tries: " << tries << "; max_attempts_: " << max_attempts_; - std::lock_guard<std::recursive_mutex> lck(multi_mutex_); - if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) { - FailAll(actions, tries, ew, nullptr); - } else { - TryResubmit(actions, tries); - } - }); - return; -} - -template <typename REQ, typename RESP> -Future<std::vector<Try<std::unique_ptr<Response>>>> -AsyncBatchRpcRetryingCaller<REQ, RESP>::GetMultiResponse(const ActionsByServer &actions_by_server) { - auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{}; - auto user = User::defaultUser(); - for (const auto &action_by_server : actions_by_server) { - std::unique_ptr<Request> multi_req = - RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region()); - auto host = action_by_server.first->host_name(); - int port = action_by_server.first->port(); - multi_calls.push_back( - rpc_client_->AsyncCall(host, port, std::move(multi_req), user, "ClientService")); - } - return collectAll(multi_calls); -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server, - int32_t tries) { - int64_t remaining_ns; - if (operation_timeout_ns_.count() > 0) { - remaining_ns = RemainingTimeNs(); - if (remaining_ns <= 0) { - std::vector<std::shared_ptr<Action>> failed_actions; - for (const auto &action_by_server : actions_by_server) { - for (auto &value : action_by_server.second->actions_by_region()) { - for (const auto &failed_action : value.second->actions()) { - failed_actions.push_back(failed_action); - } - } - } - FailAll(failed_actions, tries); - return; - } - } else { - remaining_ns = std::numeric_limits<int64_t>::max(); - } - - std::vector<std::shared_ptr<Request>> multi_reqv; - for (const auto &action_by_server : actions_by_server) - multi_reqv.push_back( - std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region()))); - - GetMultiResponse(actions_by_server) - .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) { - std::lock_guard<std::recursive_mutex> lck(multi_mutex_); - uint64_t num = 0; - for (const auto &action_by_server : actions_by_server) { - if (completed_responses[num].hasValue()) { - auto multi_response = - ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(), - action_by_server.second->actions_by_region()); - OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first, - std::move(multi_response)); - } else if (completed_responses[num].hasException()) { - folly::exception_wrapper ew = completed_responses[num].exception(); - VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString() - << " from server for action index:" << num; - OnError(action_by_server.second->actions_by_region(), tries, ew, - action_by_server.first); - } - num++; - } - }) - .onError([=](const folly::exception_wrapper &ew) { - VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString(); - std::lock_guard<std::recursive_mutex> lck(multi_mutex_); - for (const auto &action_by_server : actions_by_server) { - OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first); - } - }); - return; -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete( - const ActionsByRegion &actions_by_region, int32_t tries, - const std::shared_ptr<ServerName> server_name, - const std::unique_ptr<hbase::MultiResponse> multi_response) { - std::vector<std::shared_ptr<Action>> failed_actions; - const auto region_results = multi_response->RegionResults(); - for (const auto &action_by_region : actions_by_region) { - auto region_result_itr = region_results.find(action_by_region.first); - if (region_result_itr != region_results.end()) { - for (const auto &action : action_by_region.second->actions()) { - OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second, - failed_actions); - } - } else if (region_result_itr == region_results.end()) { - auto region_exc = multi_response->RegionException(action_by_region.first); - if (region_exc == nullptr) { - // FailAll actions for this particular region as inconsistent server response. So we raise - // this exception to the application - std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() + - " sent us neither results nor exceptions for " + - action_by_region.first; - VLOG(1) << err_msg; - auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg); - FailAll(action_by_region.second->actions(), tries, ew, server_name); - } else { - // Eg: org.apache.hadoop.hbase.NotServingRegionException: - LogException(tries, action_by_region.second, *region_exc, server_name); - if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*region_exc)) { - FailAll(action_by_region.second->actions(), tries, *region_exc, server_name); - return; - } - location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(), - *region_exc); - AddError(action_by_region.second->actions(), *region_exc, server_name); - for (const auto &action : action_by_region.second->actions()) { - failed_actions.push_back(action); - } - } - } - } - if (!failed_actions.empty()) { - TryResubmit(failed_actions, tries); - } - - return; -} - -template <typename REQ, typename RESP> -void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete( - const std::shared_ptr<Action> &action, const std::shared_ptr<RegionRequest> ®ion_request, - int32_t tries, const std::shared_ptr<ServerName> &server_name, - const std::shared_ptr<RegionResult> ®ion_result, - std::vector<std::shared_ptr<Action>> &failed_actions) { - std::string err_msg; - try { - auto result_or_exc = region_result->ResultOrException(action->original_index()); - auto result = std::get<0>(*result_or_exc); - auto exc = std::get<1>(*result_or_exc); - if (exc != nullptr) { - LogException(tries, region_request, *exc, server_name); - if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*exc)) { - FailOne(action, tries, *exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); - } else { - failed_actions.push_back(action); - } - } else if (result != nullptr) { - action2promises_[action->original_index()].setValue(std::move(result)); - } else { - std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() + - " sent us neither results nor exceptions for request @ index " + - std::to_string(action->original_index()) + ", row " + - action->action()->row() + " of " + - region_request->region_location()->region_name(); - VLOG(1) << err_msg; - auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg); - AddError(action, ew, server_name); - failed_actions.push_back(action); - } - } catch (const std::out_of_range &oor) { - // This should never occur. Error in logic. Throwing std::runtime_error from here. Will be - // retried or failed - std::string err_msg = "ResultOrException not present @ index " + - std::to_string(action->original_index()) + ", row " + - action->action()->row() + " of " + - region_request->region_location()->region_name(); - throw std::runtime_error(err_msg); - } - return; -} - -template class AsyncBatchRpcRetryingCaller<std::shared_ptr<hbase::Row>, - std::shared_ptr<hbase::Result>>; -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-caller.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h deleted file mode 100644 index 9194b04..0000000 --- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <folly/ExceptionWrapper.h> -#include <folly/Format.h> -#include <folly/Try.h> -#include <folly/futures/Future.h> -#include <folly/futures/Promise.h> -#include <folly/io/IOBuf.h> -#include <folly/io/async/HHWheelTimer.h> -#include <wangle/concurrent/CPUThreadPoolExecutor.h> - -#include <algorithm> -#include <chrono> -#include <functional> -#include <map> -#include <memory> -#include <mutex> -#include <stdexcept> -#include <string> -#include <tuple> -#include <type_traits> -#include <unordered_map> -#include <utility> -#include <vector> - -#include "connection/rpc-client.h" -#include "core/action.h" -#include "core/async-connection.h" -#include "core/location-cache.h" -#include "core/multi-response.h" -#include "core/region-location.h" -#include "core/region-request.h" -#include "core/region-result.h" -#include "core/request-converter.h" -#include "core/response-converter.h" -#include "core/result.h" -#include "core/row.h" -#include "core/server-request.h" -#include "exceptions/exception.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" -#include "security/user.h" -#include "utils/connection-util.h" -#include "utils/sys-util.h" -#include "utils/time-util.h" - -namespace hbase { -/* Equals function for ServerName */ -struct ServerNameEquals { - bool operator()(const std::shared_ptr<pb::ServerName> &lhs, - const std::shared_ptr<pb::ServerName> &rhs) const { - return (lhs->start_code() == rhs->start_code() && lhs->host_name() == rhs->host_name() && - lhs->port() == rhs->port()); - } -}; - -struct ServerNameHash { - /** hash */ - std::size_t operator()(const std::shared_ptr<pb::ServerName> &sn) const { - std::size_t h = 0; - boost::hash_combine(h, sn->start_code()); - boost::hash_combine(h, sn->host_name()); - boost::hash_combine(h, sn->port()); - return h; - } -}; - -template <typename REQ, typename RESP> -class AsyncBatchRpcRetryingCaller { - public: - using ActionsByServer = - std::unordered_map<std::shared_ptr<pb::ServerName>, std::shared_ptr<ServerRequest>, - ServerNameHash, ServerNameEquals>; - using ActionsByRegion = ServerRequest::ActionsByRegion; - - AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn, - std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<pb::TableName> table_name, - const std::vector<REQ> &actions, std::chrono::nanoseconds pause_ns, - int32_t max_attempts, std::chrono::nanoseconds operation_timeout_ns, - std::chrono::nanoseconds rpc_timeout_ns, - int32_t start_log_errors_count); - - ~AsyncBatchRpcRetryingCaller(); - - folly::Future<std::vector<folly::Try<RESP>>> Call(); - - private: - int64_t RemainingTimeNs(); - - void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request, - const folly::exception_wrapper &ew, - std::shared_ptr<pb::ServerName> server_name); - - void LogException(int32_t tries, - const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests, - const folly::exception_wrapper &ew, - std::shared_ptr<pb::ServerName> server_name); - - const std::string GetExtraContextForError(std::shared_ptr<pb::ServerName> server_name); - - void AddError(const std::shared_ptr<Action> &action, const folly::exception_wrapper &ew, - std::shared_ptr<pb::ServerName> server_name); - - void AddError(const std::vector<std::shared_ptr<Action>> &actions, - const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); - - void FailOne(const std::shared_ptr<Action> &action, int32_t tries, - const folly::exception_wrapper &ew, int64_t current_time, const std::string extras); - - void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries, - const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); - - void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries); - - void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec); - - void OnError(const ActionsByRegion &actions_by_region, int32_t tries, - const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); - - void TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries); - - folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations( - const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns); - - void GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries); - - folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse( - const ActionsByServer &actions_by_server); - - void Send(const ActionsByServer &actions_by_server, int32_t tries); - - void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries, - const std::shared_ptr<pb::ServerName> server_name, - const std::unique_ptr<MultiResponse> multi_results); - - void OnComplete(const std::shared_ptr<Action> &action, - const std::shared_ptr<RegionRequest> ®ion_request, int32_t tries, - const std::shared_ptr<pb::ServerName> &server_name, - const std::shared_ptr<RegionResult> ®ion_result, - std::vector<std::shared_ptr<Action>> &failed_actions); - - private: - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<hbase::AsyncConnection> conn_; - std::shared_ptr<pb::TableName> table_name_; - std::vector<std::shared_ptr<Action>> actions_; - std::chrono::nanoseconds pause_ns_; - int32_t max_attempts_ = 0; - std::chrono::nanoseconds operation_timeout_ns_; - std::chrono::nanoseconds rpc_timeout_ns_; - int32_t start_log_errors_count_ = 0; - - int64_t start_ns_ = TimeUtil::GetNowNanos(); - int32_t tries_ = 1; - std::map<uint64_t, folly::Promise<RESP>> action2promises_; - std::vector<folly::Future<RESP>> action2futures_; - std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_; - - std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr; - std::shared_ptr<RpcClient> rpc_client_ = nullptr; - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr; - - std::recursive_mutex multi_mutex_; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-batch-rpc-retrying-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc deleted file mode 100644 index 00cf2b8..0000000 --- a/hbase-native-client/core/async-batch-rpc-retrying-test.cc +++ /dev/null @@ -1,577 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Logging.h> -#include <folly/Memory.h> -#include <folly/futures/Future.h> -#include <folly/io/async/EventBase.h> -#include <folly/io/async/ScopedEventBaseThread.h> -#include <gtest/gtest.h> -#include <wangle/concurrent/IOThreadPoolExecutor.h> - -#include <chrono> -#include <functional> -#include <string> - -#include "connection/rpc-client.h" -#include "core/async-batch-rpc-retrying-caller.h" -#include "core/async-connection.h" -#include "core/async-rpc-retrying-caller-factory.h" -#include "core/client.h" -#include "core/connection-configuration.h" -#include "core/keyvalue-codec.h" -#include "core/region-location.h" -#include "core/result.h" -#include "exceptions/exception.h" -#include "test-util/test-util.h" -#include "utils/time-util.h" - -using hbase::AsyncRpcRetryingCallerFactory; -using hbase::AsyncConnection; -using hbase::AsyncRegionLocator; -using hbase::ConnectionConfiguration; -using hbase::Configuration; -using hbase::HBaseRpcController; -using hbase::RegionLocation; -using hbase::RegionLocateType; -using hbase::RpcClient; -using hbase::RequestConverter; -using hbase::ResponseConverter; -using hbase::Put; -using hbase::TimeUtil; -using hbase::Client; -using hbase::security::User; - -using std::chrono::nanoseconds; -using std::chrono::milliseconds; - -using namespace hbase; - -using folly::exception_wrapper; - -class AsyncBatchRpcRetryTest : public ::testing::Test { - public: - static std::unique_ptr<hbase::TestUtil> test_util; - static std::string tableName; - - static void SetUpTestCase() { - google::InstallFailureSignalHandler(); - test_util = std::make_unique<hbase::TestUtil>(); - test_util->StartMiniCluster(2); - std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400", - "test500", "test600", "test700", "test800", "test900"}; - tableName = "split-table1"; - test_util->CreateTable(tableName, "d", keys); - } -}; -std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr; -std::string AsyncBatchRpcRetryTest::tableName; - -class AsyncRegionLocatorBase : public AsyncRegionLocator { - public: - AsyncRegionLocatorBase() {} - explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location) - : region_location_(region_location) {} - virtual ~AsyncRegionLocatorBase() = default; - - folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &, - const std::string &row, - const RegionLocateType, - const int64_t) override { - folly::Promise<std::shared_ptr<RegionLocation>> promise; - promise.setValue(region_locations_.at(row)); - return promise.getFuture(); - } - - virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) { - region_location_ = region_location; - } - - virtual void set_region_location( - const std::map<std::string, std::shared_ptr<RegionLocation>> ®_locs) { - for (auto reg_loc : reg_locs) { - region_locations_[reg_loc.first] = reg_loc.second; - } - } - - void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override { - } - - protected: - std::shared_ptr<RegionLocation> region_location_; - std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_; - std::map<std::string, uint32_t> mtries_; - std::map<std::string, uint32_t> mnum_fails_; - - void InitRetryMaps(uint32_t num_fails) { - if (mtries_.size() == 0 && mnum_fails_.size() == 0) { - for (auto reg_loc : region_locations_) { - mtries_[reg_loc.first] = 0; - mnum_fails_[reg_loc.first] = num_fails; - } - } - } -}; - -class MockAsyncRegionLocator : public AsyncRegionLocatorBase { - public: - MockAsyncRegionLocator() : AsyncRegionLocatorBase() {} - explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) - : AsyncRegionLocatorBase(region_location) {} - virtual ~MockAsyncRegionLocator() {} -}; - -class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase { - private: - uint32_t counter_ = 0; - uint32_t num_fails_ = 0; - uint32_t tries_ = 0; - - public: - explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails) - : AsyncRegionLocatorBase(), num_fails_(num_fails) {} - explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) - : AsyncRegionLocatorBase(region_location) {} - virtual ~MockWrongRegionAsyncRegionLocator() {} - - folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( - const hbase::pb::TableName &tn, const std::string &row, - const RegionLocateType locate_type = RegionLocateType::kCurrent, - const int64_t locate_ns = 0) override { - InitRetryMaps(num_fails_); - auto &tries = mtries_[row]; - auto &num_fails = mnum_fails_[row]; - if (++tries > num_fails) { - return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); - } - - folly::Promise<std::shared_ptr<RegionLocation>> promise; - /* set random region name, simulating invalid region */ - auto result = std::make_shared<RegionLocation>("whatever-region-name", - region_locations_.at(row)->region_info(), - region_locations_.at(row)->server_name()); - promise.setValue(result); - return promise.getFuture(); - } -}; - -class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase { - private: - uint32_t tries_ = 0; - uint32_t num_fails_ = 0; - uint32_t counter_ = 0; - - public: - explicit MockFailingAsyncRegionLocator(uint32_t num_fails) - : AsyncRegionLocatorBase(), num_fails_(num_fails) {} - explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) - : AsyncRegionLocatorBase(region_location) {} - virtual ~MockFailingAsyncRegionLocator() {} - folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( - const hbase::pb::TableName &tn, const std::string &row, - const RegionLocateType locate_type = RegionLocateType::kCurrent, - const int64_t locate_ns = 0) override { - InitRetryMaps(num_fails_); - auto &tries = mtries_[row]; - auto &num_fails = mnum_fails_[row]; - if (++tries > num_fails) { - return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); - } - - folly::Promise<std::shared_ptr<RegionLocation>> promise; - promise.setException(std::runtime_error{"Failed to look up region location"}); - return promise.getFuture(); - } -}; - -class MockAsyncConnection : public AsyncConnection, - public std::enable_shared_from_this<MockAsyncConnection> { - public: - MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf, - std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, - std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor, - std::shared_ptr<RpcClient> rpc_client, - std::shared_ptr<AsyncRegionLocator> region_locator) - : conn_conf_(conn_conf), - retry_timer_(retry_timer), - cpu_executor_(cpu_executor), - io_executor_(io_executor), - retry_executor_(retry_executor), - rpc_client_(rpc_client), - region_locator_(region_locator) {} - ~MockAsyncConnection() {} - void Init() { - caller_factory_ = - std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_); - } - - std::shared_ptr<Configuration> conf() override { return nullptr; } - std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; } - std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override { - return caller_factory_; - } - std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; } - std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; } - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; } - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; } - std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override { - return retry_executor_; - } - - void Close() override { - retry_timer_->destroy(); - retry_executor_->stop(); - io_executor_->stop(); - cpu_executor_->stop(); - } - std::shared_ptr<HBaseRpcController> CreateRpcController() override { - return std::make_shared<HBaseRpcController>(); - } - - private: - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<ConnectionConfiguration> conn_conf_; - std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_; - std::shared_ptr<RpcClient> rpc_client_; - std::shared_ptr<AsyncRegionLocator> region_locator_; - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; - std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_; -}; - -class MockRawAsyncTableImpl { - public: - explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn, - std::shared_ptr<hbase::pb::TableName> tn) - : conn_(conn), tn_(tn) {} - virtual ~MockRawAsyncTableImpl() = default; - - /* implement this in real RawAsyncTableImpl. */ - template <typename REQ, typename RESP> - folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) { - /* init request caller builder */ - auto builder = conn_->caller_factory()->Batch<REQ, RESP>(); - - /* call with retry to get result */ - auto async_caller = - builder->table(tn_) - ->actions(std::make_shared<std::vector<REQ>>(rows)) - ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout()) - ->operation_timeout(conn_->connection_conf()->operation_timeout()) - ->pause(conn_->connection_conf()->pause()) - ->max_attempts(conn_->connection_conf()->max_retries()) - ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count()) - ->Build(); - - return async_caller->Call().then([async_caller](auto r) { return r; }); - } - - private: - std::shared_ptr<MockAsyncConnection> conn_; - std::shared_ptr<hbase::pb::TableName> tn_; -}; - -std::shared_ptr<MockAsyncConnection> getAsyncConnection( - Client &client, uint32_t operation_timeout_millis, uint32_t tries, - std::shared_ptr<AsyncRegionLocatorBase> region_locator) { - /* init region location and rpc channel */ - auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4); - auto io_executor_ = client.async_connection()->io_executor(); - auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1); - auto codec = std::make_shared<hbase::KeyValueCodec>(); - auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec, - AsyncBatchRpcRetryTest::test_util->conf()); - std::shared_ptr<folly::HHWheelTimer> retry_timer = - folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); - - /* init connection configuration */ - auto connection_conf = std::make_shared<ConnectionConfiguration>( - TimeUtil::SecondsToNanos(20), // connect_timeout - TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout - TimeUtil::SecondsToNanos(60), // rpc_timeout - TimeUtil::MillisToNanos(100), // pause - tries, // max retries - 1); // start log errors count - - return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_, - io_executor_, retry_executor_, rpc_client, - region_locator); -} - -template <typename ACTION> -std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) { - std::vector<std::shared_ptr<hbase::Row>> rows; - for (auto action : actions) { - std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action); - rows.push_back(srow); - } - return rows; -} - -template <typename REQ, typename RESP> -std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions, - std::vector<folly::Try<RESP>> &tresults) { - std::vector<std::shared_ptr<hbase::Result>> results{}; - uint64_t num = 0; - for (auto tresult : tresults) { - if (tresult.hasValue()) { - results.push_back(tresult.value()); - } else if (tresult.hasException()) { - folly::exception_wrapper ew = tresult.exception(); - LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " - << actions[num].row(); - throw ew; - } - ++num; - } - return results; -} - -template <typename ACTION> -std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions( - uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) { - std::map<std::string, std::shared_ptr<RegionLocation>> region_locations; - for (uint64_t i = 0; i < num_rows; ++i) { - auto row = "test" + std::to_string(i); - ACTION action(row); - actions.push_back(action); - region_locations[row] = table->GetRegionLocation(row); - } - return region_locations; -} - -void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator, - const std::string &table_name, bool split_regions, uint32_t tries = 3, - uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) { - // Create TableName and Row to be fetched from HBase - auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName); - - // Create a client - Client client(*AsyncBatchRpcRetryTest::test_util->conf()); - - // Get connection to HBase Table - std::shared_ptr<Table> table = client.Table(tn); - - for (uint64_t i = 0; i < num_rows; i++) { - table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i), - "value" + std::to_string(i))); - } - std::vector<hbase::Get> gets; - auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table); - - /* set region locator */ - region_locator->set_region_location(region_locations); - - /* init hbase client connection */ - auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator); - conn->Init(); - - /* init retry caller factory */ - auto tableImpl = - std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn)); - - std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets); - auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get( - milliseconds(operation_timeout_millis)); - ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty."; - - auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults); - // Test the values, should be same as in put executed on hbase shell - ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty."; - uint32_t i = 0; - for (; i < num_rows; ++i) { - ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() - << " must not be empty"; - EXPECT_EQ("test" + std::to_string(i), results[i]->Row()); - EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value()); - } - - table->Close(); - client.Close(); - conn->Close(); -} - -void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator, - const std::string &table_name, bool split_regions, uint32_t tries = 3, - uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) { - // Create TableName and Row to be fetched from HBase - auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName); - - // Create a client - Client client(*AsyncBatchRpcRetryTest::test_util->conf()); - - // Get connection to HBase Table - std::shared_ptr<Table> table = client.Table(tn); - - std::vector<hbase::Put> puts; - auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table); - - /* set region locator */ - region_locator->set_region_location(region_locations); - - /* init hbase client connection */ - auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator); - conn->Init(); - - /* init retry caller factory */ - auto tableImpl = - std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn)); - - std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts); - auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get( - milliseconds(operation_timeout_millis)); - ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty."; - - auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults); - // Test the values, should be same as in put executed on hbase shell - ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty."; - - table->Close(); - client.Close(); - conn->Close(); -} - -// Test successful case -TEST_F(AsyncBatchRpcRetryTest, MultiGets) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockAsyncRegionLocator>()); - runMultiGets(region_locator, "table1", false); -} - -// Tests the RPC failing 3 times, then succeeding -TEST_F(AsyncBatchRpcRetryTest, HandleException) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); - runMultiGets(region_locator, "table2", false, 5); -} - -// Tests the RPC failing 4 times, throwing an exception -TEST_F(AsyncBatchRpcRetryTest, FailWithException) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false)); -} - -// Tests the region location lookup failing 3 times, then succeeding -TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(3)); - runMultiGets(region_locator, "table4", false); -} - -// Tests the region location lookup failing 5 times, throwing an exception -TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3)); -} - -// Tests hitting operation timeout, thus not retrying anymore -TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(6)); - EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000)); -} - -////////////////////// -// Test successful case -TEST_F(AsyncBatchRpcRetryTest, MultiPuts) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockAsyncRegionLocator>()); - runMultiPuts(region_locator, "table1", false); -} - -// Tests the RPC failing 3 times, then succeeding -TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); - runMultiPuts(region_locator, "table2", false, 5); -} - -// Tests the RPC failing 4 times, throwing an exception -TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false)); -} - -// Tests the region location lookup failing 3 times, then succeeding -TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(3)); - runMultiPuts(region_locator, "table4", false); -} - -// Tests the region location lookup failing 5 times, throwing an exception -TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3)); -} - -// Tests hitting operation timeout, thus not retrying anymore -TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(6)); - EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000)); -} - - // Test successful case - TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockAsyncRegionLocator>()); - runMultiGets(region_locator, "table7", true); - } - - // Tests the RPC failing 3 times, then succeeding - TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); - runMultiGets(region_locator, "table8", true, 5); - } - - // Tests the RPC failing 4 times, throwing an exception - TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true)); - } - - // Tests the region location lookup failing 3 times, then succeeding - TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(3)); - runMultiGets(region_locator, "table10", true); - } - - // Tests the region location lookup failing 5 times, throwing an exception - TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3)); - } - - // Tests hitting operation timeout, thus not retrying anymore - TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(6)); - EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000)); - } http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-client-scanner.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-client-scanner.cc b/hbase-native-client/core/async-client-scanner.cc deleted file mode 100644 index 720ab25..0000000 --- a/hbase-native-client/core/async-client-scanner.cc +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/async-client-scanner.h" - -#include <algorithm> -#include <iterator> -#include <limits> -#include <stdexcept> - -namespace hbase { - -AsyncClientScanner::AsyncClientScanner( - std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan, - std::shared_ptr<pb::TableName> table_name, std::shared_ptr<RawScanResultConsumer> consumer, - nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos, - nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) - : conn_(conn), - scan_(scan), - table_name_(table_name), - consumer_(consumer), - pause_(pause), - max_retries_(max_retries), - scan_timeout_nanos_(scan_timeout_nanos), - rpc_timeout_nanos_(rpc_timeout_nanos), - start_log_errors_count_(start_log_errors_count) { - results_cache_ = std::make_shared<ScanResultCache>(); - max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); -} - -void AsyncClientScanner::Start() { OpenScanner(); } - -folly::Future<std::shared_ptr<OpenScannerResponse>> AsyncClientScanner::CallOpenScanner( - std::shared_ptr<hbase::RpcClient> rpc_client, - std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc) { - open_scanner_tries_++; - - auto preq = RequestConverter::ToScanRequest(*scan_, loc->region_name(), scan_->Caching(), false); - - auto self(shared_from_this()); - VLOG(5) << "Calling RPC Client to open the scanner for region:" << loc->DebugString(); - return rpc_client - ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), - security::User::defaultUser(), "ClientService") - .then([self, loc, controller, rpc_client](const std::unique_ptr<Response>& presp) { - VLOG(5) << "Scan Response:" << presp->DebugString(); - return std::make_shared<OpenScannerResponse>(rpc_client, presp, loc, controller); - }); -} - -void AsyncClientScanner::OpenScanner() { - auto self(shared_from_this()); - open_scanner_tries_ = 1; - - auto caller = conn_->caller_factory() - ->Single<std::shared_ptr<OpenScannerResponse>>() - ->table(table_name_) - ->row(scan_->StartRow()) - ->locate_type(GetLocateType(*scan_)) - ->rpc_timeout(rpc_timeout_nanos_) - ->operation_timeout(scan_timeout_nanos_) - ->pause(pause_) - ->max_retries(max_retries_) - ->start_log_errors_count(start_log_errors_count_) - ->action([&](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) - -> folly::Future<std::shared_ptr<OpenScannerResponse>> { - return CallOpenScanner(rpc_client, controller, loc); - }) - ->Build(); - - caller->Call() - .then([this, self](std::shared_ptr<OpenScannerResponse> resp) { - VLOG(3) << "Opened scanner with id:" << resp->scan_resp_->scanner_id() - << ", region:" << resp->region_location_->DebugString() << ", starting scan"; - StartScan(resp); - }) - .onError([this, self](const folly::exception_wrapper& e) { - VLOG(3) << "Open scan request received error:" << e.what(); - consumer_->OnError(e); - }) - .then([caller, self](const auto r) { return r; }); -} - -void AsyncClientScanner::StartScan(std::shared_ptr<OpenScannerResponse> resp) { - auto self(shared_from_this()); - auto caller = conn_->caller_factory() - ->Scan() - ->scanner_id(resp->scan_resp_->scanner_id()) - ->region_location(resp->region_location_) - ->scanner_lease_timeout(TimeUtil::MillisToNanos(resp->scan_resp_->ttl())) - ->scan(scan_) - ->rpc_client(resp->rpc_client_) - ->consumer(consumer_) - ->results_cache(results_cache_) - ->rpc_timeout(rpc_timeout_nanos_) - ->scan_timeout(scan_timeout_nanos_) - ->pause(pause_) - ->max_retries(max_retries_) - ->start_log_errors_count(start_log_errors_count_) - ->Build(); - - caller->Start(resp->controller_, resp->scan_resp_, resp->cell_scanner_) - .then([caller, self](const bool has_more) { - if (has_more) { - // open the next scanner on the next region. - self->OpenScanner(); - } else { - self->consumer_->OnComplete(); - } - }) - .onError([caller, self](const folly::exception_wrapper& e) { self->consumer_->OnError(e); }) - .then([caller, self](const auto r) { return r; }); -} - -RegionLocateType AsyncClientScanner::GetLocateType(const Scan& scan) { - // TODO: In C++, there is no Scan::IncludeStartRow() and Scan::IncludeStopRow(). - // When added, this method should be modified to return other RegionLocateTypes - // (see ConnectionUtils.java #getLocateType()) - // TODO: When reversed scans are implemented, return other RegionLocateTypes - return RegionLocateType::kCurrent; -} - -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-client-scanner.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-client-scanner.h b/hbase-native-client/core/async-client-scanner.h deleted file mode 100644 index 8663468..0000000 --- a/hbase-native-client/core/async-client-scanner.h +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Format.h> -#include <folly/Logging.h> -#include <folly/futures/Future.h> -#include <folly/io/async/EventBase.h> -#include <folly/io/async/HHWheelTimer.h> - -#include <algorithm> -#include <chrono> -#include <functional> -#include <memory> -#include <string> -#include <type_traits> -#include <utility> -#include <vector> - -#include "connection/rpc-client.h" -#include "core/async-connection.h" -#include "core/async-rpc-retrying-caller-factory.h" -#include "core/async-rpc-retrying-caller.h" -#include "core/hbase-rpc-controller.h" -#include "core/raw-scan-result-consumer.h" -#include "core/region-location.h" -#include "core/request-converter.h" -#include "core/response-converter.h" -#include "core/result.h" -#include "core/scan-result-cache.h" -#include "core/scan.h" -#include "exceptions/exception.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" -#include "utils/connection-util.h" -#include "utils/sys-util.h" -#include "utils/time-util.h" - -using std::chrono::nanoseconds; -using std::chrono::milliseconds; - -namespace hbase { -class OpenScannerResponse { - public: - OpenScannerResponse(std::shared_ptr<hbase::RpcClient> rpc_client, - const std::unique_ptr<Response>& resp, - std::shared_ptr<RegionLocation> region_location, - std::shared_ptr<hbase::HBaseRpcController> controller) - : rpc_client_(rpc_client), region_location_(region_location), controller_(controller) { - scan_resp_ = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg()); - cell_scanner_ = resp->cell_scanner(); - } - std::shared_ptr<hbase::RpcClient> rpc_client_; - std::shared_ptr<pb::ScanResponse> scan_resp_; - std::shared_ptr<RegionLocation> region_location_; - std::shared_ptr<hbase::HBaseRpcController> controller_; - std::shared_ptr<CellScanner> cell_scanner_; -}; - -class AsyncClientScanner : public std::enable_shared_from_this<AsyncClientScanner> { - public: - template <typename... T> - static std::shared_ptr<AsyncClientScanner> Create(T&&... all) { - return std::shared_ptr<AsyncClientScanner>(new AsyncClientScanner(std::forward<T>(all)...)); - } - - void Start(); - - private: - // methods - AsyncClientScanner(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan, - std::shared_ptr<pb::TableName> table_name, - std::shared_ptr<RawScanResultConsumer> consumer, nanoseconds pause, - uint32_t max_retries, nanoseconds scan_timeout_nanos, - nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); - - folly::Future<std::shared_ptr<OpenScannerResponse>> CallOpenScanner( - std::shared_ptr<hbase::RpcClient> rpc_client, - std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc); - - void OpenScanner(); - - void StartScan(std::shared_ptr<OpenScannerResponse> resp); - - RegionLocateType GetLocateType(const Scan& scan); - - private: - // data - std::shared_ptr<AsyncConnection> conn_; - std::shared_ptr<Scan> scan_; - std::shared_ptr<pb::TableName> table_name_; - std::shared_ptr<ScanResultCache> results_cache_; - std::shared_ptr<RawScanResultConsumer> consumer_; - nanoseconds pause_; - uint32_t max_retries_; - nanoseconds scan_timeout_nanos_; - nanoseconds rpc_timeout_nanos_; - uint32_t start_log_errors_count_; - uint32_t max_attempts_; - uint32_t open_scanner_tries_ = 0; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-connection.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-connection.cc b/hbase-native-client/core/async-connection.cc deleted file mode 100644 index 850fb8f..0000000 --- a/hbase-native-client/core/async-connection.cc +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/async-connection.h" -#include "core/async-rpc-retrying-caller-factory.h" - -namespace hbase { - -void AsyncConnectionImpl::Init() { - connection_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_); - // start thread pools - auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN)); - auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN)); - cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads); - io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads); - /* - * We need a retry_executor for a thread pool of size 1 due to a possible bug in wangle/folly. - * Otherwise, Assertion 'isInEventBaseThread()' always fails. See the comments - * in async-rpc-retrying-caller.cc. - */ - retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1); - retry_timer_ = folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); - - std::shared_ptr<Codec> codec = nullptr; - if (conf_->Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) == - std::string(KeyValueCodec::kJavaClassName)) { - codec = std::make_shared<hbase::KeyValueCodec>(); - } else { - LOG(WARNING) << "Not using RPC Cell Codec"; - } - rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, cpu_executor_, codec, conf_, - connection_conf_->connect_timeout()); - location_cache_ = std::make_shared<hbase::LocationCache>(conf_, io_executor_, cpu_executor_, - rpc_client_->connection_pool()); - caller_factory_ = - std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_); -} - -// We can't have the threads continue running after everything is done -// that leads to an error. -AsyncConnectionImpl::~AsyncConnectionImpl() { Close(); } - -void AsyncConnectionImpl::Close() { - if (is_closed_) return; - - cpu_executor_->stop(); - io_executor_->stop(); - retry_executor_->stop(); - retry_timer_->destroy(); - if (rpc_client_.get()) rpc_client_->Close(); - is_closed_ = true; -} - -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-connection.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-connection.h b/hbase-native-client/core/async-connection.h deleted file mode 100644 index 7b260a5..0000000 --- a/hbase-native-client/core/async-connection.h +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <folly/futures/Future.h> -#include <folly/io/IOBuf.h> -#include <wangle/concurrent/CPUThreadPoolExecutor.h> -#include <wangle/concurrent/IOThreadPoolExecutor.h> - -#include <memory> -#include <string> -#include <utility> - -#include "connection/rpc-client.h" -#include "core/async-region-locator.h" -#include "core/configuration.h" -#include "core/connection-configuration.h" -#include "core/hbase-configuration-loader.h" -#include "core/hbase-rpc-controller.h" -#include "core/keyvalue-codec.h" -#include "core/location-cache.h" -#include "if/Cell.pb.h" -#include "serde/table-name.h" - -namespace hbase { - -class AsyncRpcRetryingCallerFactory; - -class AsyncConnection { - public: - AsyncConnection() {} - virtual ~AsyncConnection() {} - virtual std::shared_ptr<Configuration> conf() = 0; - virtual std::shared_ptr<ConnectionConfiguration> connection_conf() = 0; - virtual std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() = 0; - virtual std::shared_ptr<RpcClient> rpc_client() = 0; - virtual std::shared_ptr<AsyncRegionLocator> region_locator() = 0; - virtual std::shared_ptr<HBaseRpcController> CreateRpcController() = 0; - virtual std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() = 0; - virtual std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() = 0; - virtual std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() = 0; - virtual void Close() = 0; -}; - -class AsyncConnectionImpl : public AsyncConnection, - public std::enable_shared_from_this<AsyncConnectionImpl> { - public: - virtual ~AsyncConnectionImpl(); - - // See https://mortoray.com/2013/08/02/safely-using-enable_shared_from_this/ - template <typename... T> - static std::shared_ptr<AsyncConnectionImpl> Create(T&&... all) { - auto conn = - std::shared_ptr<AsyncConnectionImpl>(new AsyncConnectionImpl(std::forward<T>(all)...)); - conn->Init(); - return conn; - } - - std::shared_ptr<Configuration> conf() override { return conf_; } - std::shared_ptr<ConnectionConfiguration> connection_conf() override { return connection_conf_; } - std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override { - return caller_factory_; - } - std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; } - std::shared_ptr<LocationCache> location_cache() { return location_cache_; } - std::shared_ptr<AsyncRegionLocator> region_locator() override { return location_cache_; } - std::shared_ptr<HBaseRpcController> CreateRpcController() override { - return std::make_shared<HBaseRpcController>(); - } - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; } - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; } - std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override { - return retry_executor_; - } - - void Close() override; - - protected: - AsyncConnectionImpl() {} - - private: - /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */ - static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size"; - /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ - static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size"; - /** The RPC codec to encode cells. For now it is KeyValueCodec */ - static constexpr const char* kRpcCodec = "hbase.client.rpc.codec"; - - std::shared_ptr<Configuration> conf_; - std::shared_ptr<ConnectionConfiguration> connection_conf_; - std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_; - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; - std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_; - std::shared_ptr<LocationCache> location_cache_; - std::shared_ptr<RpcClient> rpc_client_; - bool is_closed_ = false; - - private: - explicit AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {} - void Init(); -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-region-locator.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-region-locator.h b/hbase-native-client/core/async-region-locator.h deleted file mode 100644 index f75cb7e..0000000 --- a/hbase-native-client/core/async-region-locator.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#pragma once - -#include <folly/ExceptionWrapper.h> -#include <folly/futures/Future.h> -#include <memory> -#include <string> - -#include "core/region-location.h" -#include "if/Client.pb.h" -#include "serde/region-info.h" -#include "serde/server-name.h" -#include "serde/table-name.h" - -namespace hbase { - -class AsyncRegionLocator { - public: - AsyncRegionLocator() {} - virtual ~AsyncRegionLocator() = default; - - /** - * The only method clients should use for meta lookups. If corresponding - * location is cached, it's returned from the cache, otherwise lookup - * in meta table is done, location is cached and then returned. - * It's expected that tiny fraction of invocations incurs meta scan. - * This method is to look up non-meta regions; use LocateMeta() to get the - * location of hbase:meta region. - * - * @param tn Table name of the table to look up. This object must live until - * after the future is returned - * - * @param row of the table to look up. This object must live until after the - * future is returned - */ - virtual folly::Future<std::shared_ptr<RegionLocation>> LocateRegion( - const hbase::pb::TableName &tn, const std::string &row, - const RegionLocateType locate_type = RegionLocateType::kCurrent, - const int64_t locate_ns = 0) = 0; - /** - * Update cached region location, possibly using the information from exception. - */ - virtual void UpdateCachedLocation(const RegionLocation &loc, - const folly::exception_wrapper &error) = 0; -}; - -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller-factory.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc deleted file mode 100644 index 0ac9cac..0000000 --- a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/async-rpc-retrying-caller-factory.h" - -namespace hbase {} // namespace hbase