HBASE-18061 [C++] Fix retry logic in multi-get calls (Sudeep Sunthankar)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a93c6a99 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a93c6a99 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a93c6a99 Branch: refs/heads/HBASE-14850 Commit: a93c6a99857dcc69a4c39e9603b54dbac271baa9 Parents: 9effc92 Author: Enis Soztutar <e...@apache.org> Authored: Wed Jul 19 11:50:40 2017 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Wed Jul 19 11:50:44 2017 -0700 ---------------------------------------------------------------------- hbase-native-client/core/BUCK | 11 + .../core/async-batch-rpc-retrying-caller.cc | 241 +++++----- .../core/async-batch-rpc-retrying-caller.h | 28 +- .../core/async-batch-rpc-retrying-test.cc | 463 +++++++++++++++++++ hbase-native-client/core/client-test.cc | 91 +++- hbase-native-client/core/multi-response.cc | 34 +- hbase-native-client/core/multi-response.h | 14 +- hbase-native-client/core/region-result.cc | 2 +- hbase-native-client/core/region-result.h | 5 +- hbase-native-client/core/response-converter.cc | 50 +- hbase-native-client/core/response-converter.h | 8 +- hbase-native-client/core/server-request.h | 10 +- 12 files changed, 753 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 464c010..f9db0bd 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -323,6 +323,17 @@ cxx_test( ":core", ], run_test_separately=True,) +cxx_test( + name="multi-retry-test", + srcs=[ + "async-batch-rpc-retrying-test.cc", + ], + deps=[ + ":core", + "//test-util:test-util", + "//exceptions:exceptions", + ], + run_test_separately=True,) cxx_binary( name="simple-client", srcs=[ http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/async-batch-rpc-retrying-caller.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc index 05290f5..0d67b17 100644 --- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc @@ -77,20 +77,20 @@ int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() { void AsyncBatchRpcRetryingCaller::LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request, - std::shared_ptr<std::exception> &error, + const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { if (tries > start_log_errors_count_) { std::string regions; regions += region_request->region_location()->region_name() + ", "; LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" << table_name_->qualifier() << " from " << server_name->host_name() - << " failed, tries=" << tries << ":- " << error->what(); + << " failed, tries=" << tries << ":- " << ew.what().toStdString(); } } void AsyncBatchRpcRetryingCaller::LogException( - int32_t tries, std::vector<std::shared_ptr<RegionRequest>> ®ion_requests, - std::shared_ptr<std::exception> &error, std::shared_ptr<ServerName> server_name) { + int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests, + const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { if (tries > start_log_errors_count_) { std::string regions; for (const auto region_request : region_requests) { @@ -98,7 +98,7 @@ void AsyncBatchRpcRetryingCaller::LogException( } LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":" << table_name_->qualifier() << " from " << server_name->host_name() - << " failed, tries=" << tries << error->what(); + << " failed, tries=" << tries << ew.what().toStdString(); } } @@ -107,27 +107,24 @@ const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError( return server_name ? server_name->ShortDebugString() : ""; } -// TODO HBASE-17800 pass folly ew instead of std::exception void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr<Action> &action, - std::shared_ptr<std::exception> error, + const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { - folly::exception_wrapper ew; ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); AddAction2Error(action->original_index(), twec); } void AsyncBatchRpcRetryingCaller::AddError(const std::vector<std::shared_ptr<Action>> &actions, - std::shared_ptr<std::exception> error, + const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { for (const auto action : actions) { - AddError(action, error, server_name); + AddError(action, ew, server_name); } } -// TODO HBASE-17800 pass folly ew instead of std::exception void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, int32_t tries, - std::shared_ptr<std::exception> error, - int64_t current_time, const std::string extras) { + const folly::exception_wrapper &ew, int64_t current_time, + const std::string extras) { auto action_index = action->original_index(); auto itr = action2promises_.find(action_index); if (itr != action2promises_.end()) { @@ -135,7 +132,6 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, return; } } - folly::exception_wrapper ew; ThrowableWithExtraContext twec(ew, current_time, extras); AddAction2Error(action_index, twec); action2promises_[action_index].setException( @@ -143,10 +139,10 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, } void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions, - int32_t tries, std::shared_ptr<std::exception> error, + int32_t tries, const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { for (const auto action : actions) { - FailOne(action, tries, error, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); + FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); } } @@ -159,7 +155,7 @@ void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Acti return; } action2promises_[action_index].setException( - RetriesExhaustedException(tries - 1, action2errors_[action_index])); + RetriesExhaustedException(tries, action2errors_[action_index])); } } @@ -176,34 +172,32 @@ void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index, } void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries, - std::shared_ptr<std::exception> exc, + const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { std::vector<std::shared_ptr<Action>> copied_actions; std::vector<std::shared_ptr<RegionRequest>> region_requests; for (const auto &action_by_region : actions_by_region) { region_requests.push_back(action_by_region.second); - // Concurrent for (const auto &action : action_by_region.second->actions()) { copied_actions.push_back(action); } } - // TODO HBASE-17800 for exc check with DoNotRetryIOException - LogException(tries, region_requests, exc, server_name); - if (tries >= max_attempts_) { - FailAll(copied_actions, tries, exc, server_name); + + LogException(tries, region_requests, ew, server_name); + if ((tries >= max_attempts_) || !ExceptionUtil::ShouldRetry(ew)) { + FailAll(copied_actions, tries, ew, server_name); return; } - AddError(copied_actions, exc, server_name); + AddError(copied_actions, ew, server_name); TryResubmit(copied_actions, tries); } -void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector<std::shared_ptr<Action>> actions, +void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) { int64_t delay_ns; if (operation_timeout_ns_.count() > 0) { int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; if (max_delay_ns <= 0) { - VLOG(8) << "Fail All from onError"; FailAll(actions, tries); return; } @@ -211,9 +205,12 @@ void AsyncBatchRpcRetryingCaller::TryResubmit(std::vector<std::shared_ptr<Action } else { delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1); } - // TODO This gives segfault @ present, when retried - // retry_timer_->scheduleTimeoutFn([&]() { GroupAndSend(actions, tries + 1); }, - // milliseconds(TimeUtil::ToMillis(delay_ns))); + + conn_->retry_executor()->add([=]() { + retry_timer_->scheduleTimeoutFn( + [=]() { conn_->cpu_executor()->add([=]() { GroupAndSend(actions, tries + 1); }); }, + milliseconds(TimeUtil::ToMillis(delay_ns))); + }); } Future<std::vector<Try<std::shared_ptr<RegionLocation>>>> @@ -234,7 +231,7 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr if (operation_timeout_ns_.count() > 0) { locate_timeout_ns = RemainingTimeNs(); if (locate_timeout_ns <= 0) { - FailAll(actions_, tries); + FailAll(actions, tries); return; } } else { @@ -242,8 +239,8 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr } GetRegionLocations(actions, locate_timeout_ns) - .then([&](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) { - std::lock_guard<std::mutex> lock(multi_mutex_); + .then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) { + std::lock_guard<std::recursive_mutex> lck(multi_mutex_); ActionsByServer actions_by_server; std::vector<std::shared_ptr<Action>> locate_failed; @@ -252,50 +249,36 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr if (loc[i].hasValue()) { auto region_loc = loc[i].value(); // Add it to actions_by_server; - // Concurrent auto search = actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name())); if (search != actions_by_server.end()) { search->second->AddActionsByRegion(region_loc, action); } else { - // Create new key auto server_request = std::make_shared<ServerRequest>(region_loc); server_request->AddActionsByRegion(region_loc, action); auto server_name = std::make_shared<ServerName>(region_loc->server_name()); actions_by_server[server_name] = server_request; } - locate_failed.push_back(action); - VLOG(8) << "row [" << action->action()->row() << "] of table[" - << table_name_->namespace_() << ":" << table_name_->qualifier() - << " found in region [" << region_loc->region_name() << "]; host[" - << region_loc->server_name().host_name() << "]; port[" + VLOG(5) << "rowkey [" << action->action()->row() << "] of table[" + << table_name_->ShortDebugString() << "] found in [" + << region_loc->region_name() << "]; RS[" + << region_loc->server_name().host_name() << ":" << region_loc->server_name().port() << "];"; } else if (loc[i].hasException()) { - VLOG(8) << "Exception occured while locating region:- " - << loc[i].exception().getCopied()->what() << " for action index " << i; - // TODO Feedback needed, Java API only identifies DoNotRetryIOException - // We might receive runtime error from location-cache.cc too, we are treating both same - if (loc[i].exception().is_compatible_with<std::runtime_error>()) { - std::string extra = ""; - FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(), - loc[i].exception().what().toStdString()); - return; + folly::exception_wrapper ew = loc[i].exception(); + VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString() + << "for index:" << i << "; tries: " << tries + << "; max_attempts_: " << max_attempts_; + // We might receive runtime error from location-cache.cc too, we are doing FailOne and + // continue next one + if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) { + FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString()); + } else { + AddError(action, loc[i].exception(), nullptr); + locate_failed.push_back(action); } - // TODO HBASE-17800 for exc check with DoNotRetryIOException - /* - else if (loc[i].exception().is_compatible_with<hbase::DoNotRetryIOException>()) { - int64_t current_time = 0; - std::string extra = ""; - FailOne(action, tries, nullptr, TimeUtil::GetNowNanos(), - loc[i].exception().what().toStdString()); - return; - }*/ - AddError(action, std::make_shared<std::exception>(*loc[i].exception().getCopied()), - nullptr); - locate_failed.push_back(action); } } - if (!actions_by_server.empty()) { Send(actions_by_server, tries); } @@ -304,17 +287,21 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr TryResubmit(locate_failed, tries); } }) - .onError([&](const folly::exception_wrapper &ew) { - std::lock_guard<std::mutex> lock(multi_mutex_); - auto exc = ew.getCopied(); - VLOG(8) << "GetRegionLocations() exception: " << ew.what().toStdString(); + .onError([=](const folly::exception_wrapper &ew) { + VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString() + << "tries: " << tries << "; max_attempts_: " << max_attempts_; + std::lock_guard<std::recursive_mutex> lck(multi_mutex_); + if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) { + FailAll(actions, tries, ew, nullptr); + } else { + TryResubmit(actions, tries); + } }); return; } Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller::GetMultiResponse( const ActionsByServer &actions_by_server) { - // Concurrent. auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{}; auto user = User::defaultUser(); for (const auto &action_by_server : actions_by_server) { @@ -328,16 +315,14 @@ Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller: return collectAll(multi_calls); } -void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32_t tries) { +void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server, int32_t tries) { int64_t remaining_ns; if (operation_timeout_ns_.count() > 0) { remaining_ns = RemainingTimeNs(); if (remaining_ns <= 0) { std::vector<std::shared_ptr<Action>> failed_actions; for (const auto &action_by_server : actions_by_server) { - // Concurrent for (auto &value : action_by_server.second->actions_by_region()) { - // Concurrent for (const auto &failed_action : value.second->actions()) { failed_actions.push_back(failed_action); } @@ -357,30 +342,30 @@ void AsyncBatchRpcRetryingCaller::Send(ActionsByServer &actions_by_server, int32 GetMultiResponse(actions_by_server) .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) { - std::lock_guard<std::mutex> lock(multi_mutex_); - for (uint64_t num = 0; num < completed_responses.size(); ++num) { + std::lock_guard<std::recursive_mutex> lck(multi_mutex_); + uint64_t num = 0; + for (const auto &action_by_server : actions_by_server) { if (completed_responses[num].hasValue()) { auto multi_response = - ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value()); - for (const auto &action_by_server : actions_by_server) { - OnComplete(action_by_server.second->actions_by_region(), tries, - action_by_server.first, std::move(multi_response)); - } + ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(), + action_by_server.second->actions_by_region()); + OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first, + std::move(multi_response)); } else if (completed_responses[num].hasException()) { - VLOG(8) << "Received exception: " - << completed_responses[num].exception().getCopied()->what() - << " from server for action index " << num; - // TODO: we should call OnError here as well. + folly::exception_wrapper ew = completed_responses[num].exception(); + VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString() + << " from server for action index:" << num; + OnError(action_by_server.second->actions_by_region(), tries, ew, + action_by_server.first); } + num++; } }) .onError([=](const folly::exception_wrapper &ew) { - auto exc = ew.getCopied(); - VLOG(8) << "GetMultiResponse() exception: " << ew.what().toStdString(); - std::lock_guard<std::mutex> lock(multi_mutex_); + VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString(); + std::lock_guard<std::recursive_mutex> lck(multi_mutex_); for (const auto &action_by_server : actions_by_server) { - OnError(action_by_server.second->actions_by_region(), tries, - std::make_shared<std::exception>(*exc), action_by_server.first); + OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first); } }); return; @@ -391,46 +376,35 @@ void AsyncBatchRpcRetryingCaller::OnComplete( const std::shared_ptr<ServerName> server_name, const std::unique_ptr<hbase::MultiResponse> multi_response) { std::vector<std::shared_ptr<Action>> failed_actions; + const auto region_results = multi_response->RegionResults(); for (const auto &action_by_region : actions_by_region) { - auto region_result_itr = multi_response->RegionResults().find(action_by_region.first); - if (region_result_itr == multi_response->RegionResults().end()) { - VLOG(8) << "Region " << action_by_region.first << " not found in MultiResults."; - // TODO Feedback needed Should we throw from here or continue for next action_by_region ? - // Throwing at present as this looks like an inconsistency - // Concurrent - auto exc = std::make_shared<std::runtime_error>("Invalid search for region " + - action_by_region.first + " in multi results"); - FailAll(action_by_region.second->actions(), tries, exc, server_name); - return; - // std::runtime_error( - // "Invalid search for region " + action_by_region.first + " in multi results"); - } - if (region_result_itr != multi_response->RegionResults().end()) { - // Concurrent + auto region_result_itr = region_results.find(action_by_region.first); + if (region_result_itr != region_results.end()) { for (const auto &action : action_by_region.second->actions()) { OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second, failed_actions); } - } else { + } else if (region_result_itr == region_results.end()) { auto region_exc = multi_response->RegionException(action_by_region.first); - std::shared_ptr<std::exception> pexc; if (region_exc == nullptr) { - VLOG(8) << "Server sent us neither results nor exceptions for " << action_by_region.first; - pexc = std::make_shared<std::exception>(std::runtime_error("Invalid response")); - // TODO: raise this exception to the application + // FailAll actions for this particular region as inconsistent server response. So we raise + // this exception to the application + std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() + + " sent us neither results nor exceptions for " + + action_by_region.first; + VLOG(1) << err_msg; + auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg); + FailAll(action_by_region.second->actions(), tries, ew, server_name); } else { - // TODO HBASE-17800 for exc check with DoNotRetryIOException - LogException(tries, action_by_region.second, region_exc, server_name); - location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(), - *region_exc); - std::string row_name; - if (tries >= max_attempts_) { - // Concurrent - FailAll(action_by_region.second->actions(), tries, region_exc, server_name); + // Eg: org.apache.hadoop.hbase.NotServingRegionException: + LogException(tries, action_by_region.second, *region_exc, server_name); + if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*region_exc)) { + FailAll(action_by_region.second->actions(), tries, *region_exc, server_name); return; } - // Concurrent - AddError(action_by_region.second->actions(), region_exc, server_name); + location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(), + *region_exc); + AddError(action_by_region.second->actions(), *region_exc, server_name); for (const auto &action : action_by_region.second->actions()) { failed_actions.push_back(action); } @@ -440,6 +414,7 @@ void AsyncBatchRpcRetryingCaller::OnComplete( if (!failed_actions.empty()) { TryResubmit(failed_actions, tries); } + return; } @@ -454,34 +429,36 @@ void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &acti auto result_or_exc = region_result->ResultOrException(action->original_index()); auto result = std::get<0>(*result_or_exc); auto exc = std::get<1>(*result_or_exc); - std::shared_ptr<std::exception> pexc; if (exc != nullptr) { - LogException(tries, region_request, exc, server_name); - if (tries >= max_attempts_) { - FailOne(action, tries, exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); + LogException(tries, region_request, *exc, server_name); + if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*exc)) { + FailOne(action, tries, *exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); } else { failed_actions.push_back(action); } } else if (result != nullptr) { action2promises_[action->original_index()].setValue(std::move(result)); } else { - VLOG(8) << "Server " << server_name->ShortDebugString() - << " sent us neither results nor exceptions for request @ index " - << action->original_index() << ", row " << action->action()->row() << " of " - << region_request->region_location()->region_name(); - err_msg = "Invalid response"; - AddError(action, std::make_shared<std::runtime_error>(err_msg), server_name); + std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() + + " sent us neither results nor exceptions for request @ index " + + std::to_string(action->original_index()) + ", row " + + action->action()->row() + " of " + + region_request->region_location()->region_name(); + VLOG(1) << err_msg; + auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg); + AddError(action, ew, server_name); failed_actions.push_back(action); } } catch (const std::out_of_range &oor) { - // TODO Feedback needed. Should we retry for he specific index again ? - // This should never occur, so we are throwing a std::runtime_error from here - VLOG(8) << "No ResultOrException found @ index " << action->original_index() << ", row " - << action->action()->row() << " of " - << region_request->region_location()->region_name(); - throw std::runtime_error("ResultOrException not present @ index " + - std::to_string(action->original_index())); + // This should never occur. Error in logic. Throwing std::runtime_error from here. Will be + // retried or failed + std::string err_msg = "ResultOrException not present @ index " + + std::to_string(action->original_index()) + ", row " + + action->action()->row() + " of " + + region_request->region_location()->region_name(); + throw std::runtime_error(err_msg); } return; } + } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/async-batch-rpc-retrying-caller.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h index 29a0e6a..4c04b91 100644 --- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h @@ -19,6 +19,7 @@ #pragma once +#include <folly/ExceptionWrapper.h> #include <folly/Format.h> #include <folly/futures/Future.h> #include <folly/futures/Promise.h> @@ -107,36 +108,36 @@ class AsyncBatchRpcRetryingCaller { int64_t RemainingTimeNs(); void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request, - std::shared_ptr<std::exception> &error, + const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); - void LogException(int32_t tries, std::vector<std::shared_ptr<RegionRequest>> ®ion_requests, - std::shared_ptr<std::exception> &error, + void LogException(int32_t tries, + const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests, + const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); const std::string GetExtraContextForError(std::shared_ptr<pb::ServerName> server_name); - void AddError(const std::shared_ptr<Action> &action, std::shared_ptr<std::exception> error, + void AddError(const std::shared_ptr<Action> &action, const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); void AddError(const std::vector<std::shared_ptr<Action>> &actions, - std::shared_ptr<std::exception> error, std::shared_ptr<pb::ServerName> server_name); + const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); void FailOne(const std::shared_ptr<Action> &action, int32_t tries, - std::shared_ptr<std::exception> error, int64_t current_time, - const std::string extras); + const folly::exception_wrapper &ew, int64_t current_time, const std::string extras); void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries, - std::shared_ptr<std::exception> error, std::shared_ptr<pb::ServerName> server_name); + const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries); void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec); void OnError(const ActionsByRegion &actions_by_region, int32_t tries, - std::shared_ptr<std::exception> exc, std::shared_ptr<pb::ServerName> server_name); + const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); - void TryResubmit(std::vector<std::shared_ptr<Action>> actions, int32_t tries); + void TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries); folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations( const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns); @@ -146,7 +147,7 @@ class AsyncBatchRpcRetryingCaller { folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse( const ActionsByServer &actions_by_server); - void Send(ActionsByServer &actions_by_server, int32_t tries); + void Send(const ActionsByServer &actions_by_server, int32_t tries); void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries, const std::shared_ptr<pb::ServerName> server_name, @@ -179,7 +180,6 @@ class AsyncBatchRpcRetryingCaller { std::shared_ptr<RpcClient> rpc_client_ = nullptr; std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr; - std::mutex multi_mutex_; + std::recursive_mutex multi_mutex_; }; - -} /* namespace hbase */ +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/async-batch-rpc-retrying-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc new file mode 100644 index 0000000..c186276 --- /dev/null +++ b/hbase-native-client/core/async-batch-rpc-retrying-test.cc @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Logging.h> +#include <folly/Memory.h> +#include <folly/futures/Future.h> +#include <folly/io/async/EventBase.h> +#include <folly/io/async/ScopedEventBaseThread.h> +#include <gtest/gtest.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> + +#include <chrono> +#include <functional> +#include <string> + +#include "connection/rpc-client.h" +#include "core/async-batch-rpc-retrying-caller.h" +#include "core/async-connection.h" +#include "core/async-rpc-retrying-caller-factory.h" +#include "core/client.h" +#include "core/connection-configuration.h" +#include "core/keyvalue-codec.h" +#include "core/region-location.h" +#include "core/result.h" +#include "exceptions/exception.h" +#include "test-util/test-util.h" +#include "utils/time-util.h" + +using hbase::AsyncRpcRetryingCallerFactory; +using hbase::AsyncConnection; +using hbase::AsyncRegionLocator; +using hbase::ConnectionConfiguration; +using hbase::Configuration; +using hbase::HBaseRpcController; +using hbase::RegionLocation; +using hbase::RegionLocateType; +using hbase::RpcClient; +using hbase::RequestConverter; +using hbase::ResponseConverter; +using hbase::Put; +using hbase::TimeUtil; +using hbase::Client; +using hbase::security::User; + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +using namespace hbase; + +using folly::exception_wrapper; + +class AsyncBatchRpcRetryTest : public ::testing::Test { + public: + static std::unique_ptr<hbase::TestUtil> test_util; + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util = std::make_unique<hbase::TestUtil>(); + test_util->StartMiniCluster(2); + } +}; +std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr; + +class AsyncRegionLocatorBase : public AsyncRegionLocator { + public: + AsyncRegionLocatorBase() {} + explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location) + : region_location_(region_location) {} + virtual ~AsyncRegionLocatorBase() = default; + + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &, + const std::string &row, + const RegionLocateType, + const int64_t) override { + folly::Promise<std::shared_ptr<RegionLocation>> promise; + promise.setValue(region_locations_.at(row)); + return promise.getFuture(); + } + + virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) { + region_location_ = region_location; + } + + virtual void set_region_location( + const std::map<std::string, std::shared_ptr<RegionLocation>> ®_locs) { + for (auto reg_loc : reg_locs) { + region_locations_[reg_loc.first] = reg_loc.second; + } + } + + void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override { + } + + protected: + std::shared_ptr<RegionLocation> region_location_; + std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_; + std::map<std::string, uint32_t> mtries_; + std::map<std::string, uint32_t> mnum_fails_; + + void InitRetryMaps(uint32_t num_fails) { + if (mtries_.size() == 0 && mnum_fails_.size() == 0) { + for (auto reg_loc : region_locations_) { + mtries_[reg_loc.first] = 0; + mnum_fails_[reg_loc.first] = num_fails; + } + } + } +}; + +class MockAsyncRegionLocator : public AsyncRegionLocatorBase { + public: + MockAsyncRegionLocator() : AsyncRegionLocatorBase() {} + explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockAsyncRegionLocator() {} +}; + +class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase { + private: + uint32_t counter_ = 0; + uint32_t num_fails_ = 0; + uint32_t tries_ = 0; + + public: + explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails) + : AsyncRegionLocatorBase(), num_fails_(num_fails) {} + explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockWrongRegionAsyncRegionLocator() {} + + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override { + InitRetryMaps(num_fails_); + auto &tries = mtries_[row]; + auto &num_fails = mnum_fails_[row]; + if (++tries > num_fails) { + return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); + } + + folly::Promise<std::shared_ptr<RegionLocation>> promise; + /* set random region name, simulating invalid region */ + auto result = std::make_shared<RegionLocation>("whatever-region-name", + region_locations_.at(row)->region_info(), + region_locations_.at(row)->server_name()); + promise.setValue(result); + return promise.getFuture(); + } +}; + +class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase { + private: + uint32_t tries_ = 0; + uint32_t num_fails_ = 0; + uint32_t counter_ = 0; + + public: + explicit MockFailingAsyncRegionLocator(uint32_t num_fails) + : AsyncRegionLocatorBase(), num_fails_(num_fails) {} + explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockFailingAsyncRegionLocator() {} + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override { + InitRetryMaps(num_fails_); + auto &tries = mtries_[row]; + auto &num_fails = mnum_fails_[row]; + if (++tries > num_fails) { + return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); + } + + folly::Promise<std::shared_ptr<RegionLocation>> promise; + promise.setException(std::runtime_error{"Failed to look up region location"}); + return promise.getFuture(); + } +}; + +class MockAsyncConnection : public AsyncConnection, + public std::enable_shared_from_this<MockAsyncConnection> { + public: + MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf, + std::shared_ptr<folly::HHWheelTimer> retry_timer, + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor, + std::shared_ptr<RpcClient> rpc_client, + std::shared_ptr<AsyncRegionLocator> region_locator) + : conn_conf_(conn_conf), + retry_timer_(retry_timer), + cpu_executor_(cpu_executor), + io_executor_(io_executor), + retry_executor_(retry_executor), + rpc_client_(rpc_client), + region_locator_(region_locator) {} + ~MockAsyncConnection() {} + void Init() { + caller_factory_ = + std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_); + } + + std::shared_ptr<Configuration> conf() override { return nullptr; } + std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; } + std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override { + return caller_factory_; + } + std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; } + std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; } + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; } + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; } + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override { + return retry_executor_; + } + + void Close() override {} + std::shared_ptr<HBaseRpcController> CreateRpcController() override { + return std::make_shared<HBaseRpcController>(); + } + + private: + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<ConnectionConfiguration> conn_conf_; + std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_; + std::shared_ptr<RpcClient> rpc_client_; + std::shared_ptr<AsyncRegionLocator> region_locator_; + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_; +}; + +class MockRawAsyncTableImpl { + public: + explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn, + std::shared_ptr<hbase::pb::TableName> tn) + : conn_(conn), tn_(tn) {} + virtual ~MockRawAsyncTableImpl() = default; + + /* implement this in real RawAsyncTableImpl. */ + folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Gets( + const std::vector<hbase::Get> &gets) { + /* init request caller builder */ + auto builder = conn_->caller_factory()->Batch(); + + /* call with retry to get result */ + auto async_caller = + builder->table(tn_) + ->actions(std::make_shared<std::vector<hbase::Get>>(gets)) + ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout()) + ->operation_timeout(conn_->connection_conf()->operation_timeout()) + ->pause(conn_->connection_conf()->pause()) + ->max_attempts(conn_->connection_conf()->max_retries()) + ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count()) + ->Build(); + + return async_caller->Call().then([async_caller](auto r) { return r; }); + } + + private: + std::shared_ptr<MockAsyncConnection> conn_; + std::shared_ptr<hbase::pb::TableName> tn_; +}; + +void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, + const std::string &table_name, bool split_regions, uint32_t tries = 3, + uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 10000) { + std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400", + "test500", "test600", "test700", "test800", "test900"}; + std::string tableName = (split_regions) ? ("split-" + table_name) : table_name; + if (split_regions) + AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d", keys); + else + AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(tableName); + + // Create a client + Client client(*AsyncBatchRpcRetryTest::test_util->conf()); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + for (uint64_t i = 0; i < num_rows; i++) { + table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i), + "value" + std::to_string(i))); + } + + std::map<std::string, std::shared_ptr<RegionLocation>> region_locations; + std::vector<hbase::Get> gets; + for (uint64_t i = 0; i < num_rows; ++i) { + auto row = "test" + std::to_string(i); + hbase::Get get(row); + gets.push_back(get); + region_locations[row] = table->GetRegionLocation(row); + } + + /* init region location and rpc channel */ + auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4); + auto io_executor_ = client.async_connection()->io_executor(); + auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1); + auto codec = std::make_shared<hbase::KeyValueCodec>(); + auto rpc_client = + std::make_shared<RpcClient>(io_executor_, codec, AsyncBatchRpcRetryTest::test_util->conf()); + std::shared_ptr<folly::HHWheelTimer> retry_timer = + folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); + + /* init connection configuration */ + auto connection_conf = std::make_shared<ConnectionConfiguration>( + TimeUtil::SecondsToNanos(20), // connect_timeout + TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout + TimeUtil::SecondsToNanos(60), // rpc_timeout + TimeUtil::MillisToNanos(100), // pause + tries, // max retries + 1); // start log errors count + + /* set region locator */ + region_locator->set_region_location(region_locations); + + /* init hbase client connection */ + auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_, + io_executor_, retry_executor_, rpc_client, + region_locator); + conn->Init(); + + /* init retry caller factory */ + auto tableImpl = + std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn)); + + auto tresults = tableImpl->Gets(gets).get(milliseconds(operation_timeout_millis)); + + ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty."; + std::vector<std::shared_ptr<hbase::Result>> results{}; + uint32_t num = 0; + for (auto tresult : tresults) { + if (tresult.hasValue()) { + results.push_back(tresult.value()); + } else if (tresult.hasException()) { + folly::exception_wrapper ew = tresult.exception(); + LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " << gets[num].row(); + throw ew; + } + ++num; + } + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty."; + uint32_t i = 0; + for (; i < num_rows; ++i) { + ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() + << " must not be empty"; + EXPECT_EQ("test" + std::to_string(i), results[i]->Row()); + EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value()); + } + + retry_timer->destroy(); + table->Close(); + client.Close(); + retry_executor_->stop(); + io_executor_->stop(); + cpu_executor_->stop(); +} + +// Test successful case +TEST_F(AsyncBatchRpcRetryTest, MultiGets) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockAsyncRegionLocator>()); + runMultiTest(region_locator, "table1", false); +} + +// Tests the RPC failing 3 times, then succeeding +TEST_F(AsyncBatchRpcRetryTest, HandleException) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); + runMultiTest(region_locator, "table2", false, 5); +} + +// Tests the RPC failing 4 times, throwing an exception +TEST_F(AsyncBatchRpcRetryTest, FailWithException) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", false)); +} + +// Tests the region location lookup failing 3 times, then succeeding +TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(3)); + runMultiTest(region_locator, "table4", false); +} + +// Tests the region location lookup failing 5 times, throwing an exception +TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", false, 3)); +} + +// Tests hitting operation timeout, thus not retrying anymore +TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(6)); + EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", false, 5, 100, 10000)); +} + +// Test successful case +TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockAsyncRegionLocator>()); + runMultiTest(region_locator, "table1", true); +} + +// Tests the RPC failing 3 times, then succeeding +TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); + runMultiTest(region_locator, "table2", true, 5); +} + +// Tests the RPC failing 4 times, throwing an exception +TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", true)); +} + +// Tests the region location lookup failing 3 times, then succeeding +TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(3)); + runMultiTest(region_locator, "table4", true); +} + +// Tests the region location lookup failing 5 times, throwing an exception +TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", true, 3)); +} + +// Tests hitting operation timeout, thus not retrying anymore +TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(6)); + EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", true, 5, 100, 10000)); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/client-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index ba213bd..9efe0b6 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -435,29 +435,27 @@ TEST_F(ClientTest, PutsWithTimestamp) { client.Close(); } -TEST_F(ClientTest, MultiGets) { - // Using TestUtil to populate test data - ClientTest::test_util->CreateTable("t", "d"); - - // Create TableName and Row to be fetched from HBase - auto tn = folly::to<hbase::pb::TableName>("t"); - - // Create a client - hbase::Client client(*ClientTest::test_util->conf()); +void SetClientParams() { + ClientTest::test_util->conf()->SetInt("hbase.client.cpu.thread.pool.size", 6); + ClientTest::test_util->conf()->SetInt("hbase.client.operation.timeout", 600000); + ClientTest::test_util->conf()->SetInt("hbase.client.retries.number", 7); + ClientTest::test_util->conf()->SetInt("hbase.client.start.log.errors.counter", 1); +} - // Get connection to HBase Table - auto table = client.Table(tn); +void PerformPuts(uint64_t num_rows, std::shared_ptr<hbase::Client> client, + const std::string &table_name) { + auto tn = folly::to<hbase::pb::TableName>(table_name); + auto table = client->Table(tn); ASSERT_TRUE(table) << "Unable to get connection to Table."; - - uint64_t num_rows = 10000; // Perform Puts for (uint64_t i = 0; i < num_rows; i++) { table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i), "value" + std::to_string(i))); } +} +void MakeGets(uint64_t num_rows, const std::string &row_prefix, std::vector<hbase::Get> &gets) { // Perform the Gets - std::vector<hbase::Get> gets; for (uint64_t i = 0; i < num_rows; ++i) { auto row = "test" + std::to_string(i); hbase::Get get(row); @@ -465,9 +463,10 @@ TEST_F(ClientTest, MultiGets) { } gets.push_back(hbase::Get("test2")); gets.push_back(hbase::Get("testextra")); +} - auto results = table->Get(gets); - +void TestMultiResults(uint64_t num_rows, const std::vector<std::shared_ptr<hbase::Result>> &results, + const std::vector<hbase::Get> &gets) { // Test the values, should be same as in put executed on hbase shell ASSERT_TRUE(!results.empty()) << "Result vector shouldn't be empty."; @@ -483,6 +482,66 @@ TEST_F(ClientTest, MultiGets) { ++i; ASSERT_TRUE(results[i]->IsEmpty()) << "Result for Get " << gets[i].row() << " must be empty"; +} + +TEST_F(ClientTest, MultiGets) { + std::string table_name = "t"; + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable(table_name, "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(table_name); + + SetClientParams(); + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + uint64_t num_rows = 50000; + PerformPuts(num_rows, std::make_shared<hbase::Client>(client), table_name); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + std::vector<hbase::Get> gets; + MakeGets(num_rows, "test", gets); + + auto results = table->Get(gets); + + TestMultiResults(num_rows, results, gets); + + table->Close(); + client.Close(); +} + +TEST_F(ClientTest, MultiGetsWithRegionSplits) { + // Using TestUtil to populate test data + std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400", + "test500", "test600", "test700", "test800", "test900"}; + std::string table_name = "t"; + ClientTest::test_util->CreateTable(table_name, "d", keys); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(table_name); + + SetClientParams(); + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + uint64_t num_rows = 50000; + PerformPuts(num_rows, std::make_shared<hbase::Client>(client), table_name); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + std::vector<hbase::Get> gets; + MakeGets(num_rows, "test", gets); + + auto results = table->Get(gets); + + TestMultiResults(num_rows, results, gets); table->Close(); client.Close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/multi-response.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/multi-response.cc b/hbase-native-client/core/multi-response.cc index f620c98..a4c2108 100644 --- a/hbase-native-client/core/multi-response.cc +++ b/hbase-native-client/core/multi-response.cc @@ -18,6 +18,7 @@ */ #include "core/multi-response.h" +#include <glog/logging.h> #include "core/region-result.h" using hbase::pb::RegionLoadStats; @@ -36,35 +37,38 @@ int MultiResponse::Size() const { void MultiResponse::AddRegionResult(const std::string& region_name, int32_t original_index, std::shared_ptr<Result> result, - std::shared_ptr<std::exception> exc) { - bool region_found = false; - for (auto itr = results_.begin(); itr != results_.end(); ++itr) { - if (itr->first == region_name) { - region_found = true; - itr->second->AddResultOrException(original_index, result, exc); - break; - } - } - if (!region_found) { + std::shared_ptr<folly::exception_wrapper> exc) { + auto itr = results_.find(region_name); + if (itr == results_.end()) { auto region_result = std::make_shared<RegionResult>(); region_result->AddResultOrException(original_index, result, exc); results_[region_name] = region_result; + } else { + itr->second->AddResultOrException(original_index, result, exc); } } void MultiResponse::AddRegionException(const std::string& region_name, - std::shared_ptr<std::exception> exception) { - exceptions_[region_name] = exception; + std::shared_ptr<folly::exception_wrapper> exception) { + VLOG(8) << "Store Region Exception:- " << exception->what() << "; Region[" << region_name << "];"; + bool region_found = false; + auto itr = exceptions_.find(region_name); + if (itr == exceptions_.end()) { + auto region_result = std::make_shared<folly::exception_wrapper>(); + exceptions_[region_name] = exception; + } else { + itr->second = exception; + } } -std::shared_ptr<std::exception> MultiResponse::RegionException( +std::shared_ptr<folly::exception_wrapper> MultiResponse::RegionException( const std::string& region_name) const { auto find = exceptions_.at(region_name); return find; } -const std::map<std::string, std::shared_ptr<std::exception> >& MultiResponse::RegionExceptions() - const { +const std::map<std::string, std::shared_ptr<folly::exception_wrapper> >& +MultiResponse::RegionExceptions() const { return exceptions_; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/multi-response.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/multi-response.h b/hbase-native-client/core/multi-response.h index 96883fd..d38cfd6 100644 --- a/hbase-native-client/core/multi-response.h +++ b/hbase-native-client/core/multi-response.h @@ -20,6 +20,7 @@ #pragma once #include <core/region-result.h> +#include <folly/ExceptionWrapper.h> #include <exception> #include <map> #include <memory> @@ -46,17 +47,18 @@ class MultiResponse { * @param resOrEx the result or error; will be empty for successful Put and Delete actions. */ void AddRegionResult(const std::string& region_name, int32_t original_index, - std::shared_ptr<Result> result, std::shared_ptr<std::exception> exc); + std::shared_ptr<Result> result, + std::shared_ptr<folly::exception_wrapper> exc); void AddRegionException(const std::string& region_name, - std::shared_ptr<std::exception> exception); + std::shared_ptr<folly::exception_wrapper> exception); /** * @return the exception for the region, if any. Null otherwise. */ - std::shared_ptr<std::exception> RegionException(const std::string& region_name) const; + std::shared_ptr<folly::exception_wrapper> RegionException(const std::string& region_name) const; - const std::map<std::string, std::shared_ptr<std::exception>>& RegionExceptions() const; + const std::map<std::string, std::shared_ptr<folly::exception_wrapper>>& RegionExceptions() const; void AddStatistic(const std::string& region_name, std::shared_ptr<pb::RegionLoadStats> stat); @@ -66,12 +68,12 @@ class MultiResponse { private: // map of regionName to map of Results by the original index for that Result - std::map<std::string, std::shared_ptr<hbase::RegionResult>> results_; + std::map<std::string, std::shared_ptr<RegionResult>> results_; /** * The server can send us a failure for the region itself, instead of individual failure. * It's a part of the protobuf definition. */ - std::map<std::string, std::shared_ptr<std::exception>> exceptions_; + std::map<std::string, std::shared_ptr<folly::exception_wrapper>> exceptions_; }; } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/region-result.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc index 05ab274..206c876 100644 --- a/hbase-native-client/core/region-result.cc +++ b/hbase-native-client/core/region-result.cc @@ -30,7 +30,7 @@ RegionResult::RegionResult() {} RegionResult::~RegionResult() {} void RegionResult::AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result, - std::shared_ptr<std::exception> exc) { + std::shared_ptr<folly::exception_wrapper> exc) { auto index_found = result_or_excption_.find(index); if (index_found == result_or_excption_.end()) { result_or_excption_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr); http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/region-result.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h index cfd9e5a..b961634 100644 --- a/hbase-native-client/core/region-result.h +++ b/hbase-native-client/core/region-result.h @@ -19,6 +19,7 @@ #pragma once +#include <folly/ExceptionWrapper.h> #include <map> #include <memory> #include <string> @@ -29,13 +30,13 @@ namespace hbase { using ResultOrExceptionTuple = - std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<std::exception>>; + std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<folly::exception_wrapper>>; class RegionResult { public: RegionResult(); void AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result, - std::shared_ptr<std::exception> exc); + std::shared_ptr<folly::exception_wrapper> exc); void set_stat(std::shared_ptr<pb::RegionLoadStats> stat); http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index 4f9bfb1..960c487 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -18,7 +18,6 @@ */ #include "core/response-converter.h" - #include <glog/logging.h> #include <stdexcept> #include <string> @@ -125,8 +124,9 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse( return results; } -std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ptr<Request> req, - const Response& resp) { +std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults( + std::shared_ptr<Request> req, const Response& resp, + const ServerRequest::ActionsByRegion& actions_by_region) { auto multi_req = std::static_pointer_cast<hbase::pb::MultiRequest>(req->req_msg()); auto multi_resp = std::static_pointer_cast<hbase::pb::MultiResponse>(resp.resp_msg()); VLOG(3) << "GetResults:" << multi_resp->ShortDebugString(); @@ -148,11 +148,10 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ auto region_name = rs.value(); if (action_result.has_exception()) { - if (action_result.exception().has_value()) { - auto exc = std::make_shared<hbase::IOException>(action_result.exception().value()); - VLOG(8) << "Store Region Exception:- " << exc->what(); - multi_response->AddRegionException(region_name, exc); - } + auto ew = ResponseConverter::GetRemoteException(action_result.exception()); + VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region[" + << region_name << "];"; + multi_response->AddRegionException(region_name, ew); continue; } @@ -163,14 +162,16 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ " for region " + actions.region().value()); } + auto multi_actions = actions_by_region.at(region_name)->actions(); + uint64_t multi_actions_num = 0; for (hbase::pb::ResultOrException roe : action_result.resultorexception()) { std::shared_ptr<Result> result; - std::shared_ptr<std::exception> exc; + std::shared_ptr<folly::exception_wrapper> ew; if (roe.has_exception()) { - if (roe.exception().has_value()) { - exc = std::make_shared<hbase::IOException>(roe.exception().value()); - VLOG(8) << "Store ResultOrException:- " << exc->what(); - } + auto ew = ResponseConverter::GetRemoteException(roe.exception()); + VLOG(8) << "Store Remote Region Exception:- " << ew->what().toStdString() << "; Region[" + << region_name << "];"; + multi_response->AddRegionException(region_name, ew); } else if (roe.has_result()) { result = ToResult(roe.result(), resp.cell_scanner()); } else if (roe.has_service_result()) { @@ -183,7 +184,11 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ result = std::make_shared<Result>(empty_cells, multi_resp->processed() ? true : false, false, false); } - multi_response->AddRegionResult(region_name, roe.index(), std::move(result), exc); + // We add the original index of the multi-action so that when populating the response back we + // do it as per the action index + multi_response->AddRegionResult( + region_name, multi_actions[multi_actions_num]->original_index(), std::move(result), ew); + multi_actions_num++; } } @@ -196,4 +201,21 @@ std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ } return multi_response; } + +std::shared_ptr<folly::exception_wrapper> ResponseConverter::GetRemoteException( + const hbase::pb::NameBytesPair& exc_resp) { + std::string what; + std::string exception_class_name = exc_resp.has_name() ? exc_resp.name() : ""; + std::string stack_trace = exc_resp.has_value() ? exc_resp.value() : ""; + + what.append(exception_class_name).append(stack_trace); + auto remote_exception = std::make_unique<RemoteException>(what); + remote_exception->set_exception_class_name(exception_class_name) + ->set_stack_trace(stack_trace) + ->set_hostname("") + ->set_port(0); + + return std::make_shared<folly::exception_wrapper>( + folly::make_exception_wrapper<RemoteException>(*remote_exception)); +} } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/response-converter.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h index 2f8f279..edd4165 100644 --- a/hbase-native-client/core/response-converter.h +++ b/hbase-native-client/core/response-converter.h @@ -25,6 +25,7 @@ #include "connection/response.h" #include "core/multi-response.h" #include "core/result.h" +#include "core/server-request.h" #include "if/Client.pb.h" #include "serde/cell-scanner.h" @@ -56,12 +57,15 @@ class ResponseConverter { static std::vector<std::shared_ptr<Result>> FromScanResponse( const std::shared_ptr<pb::ScanResponse> resp, std::shared_ptr<CellScanner> cell_scanner); - static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req, - const Response& resp); + static std::unique_ptr<hbase::MultiResponse> GetResults( + std::shared_ptr<Request> req, const Response& resp, + const ServerRequest::ActionsByRegion& actions_by_region); private: // Constructor not required. We have all static methods to extract response from PB messages. ResponseConverter(); + static std::shared_ptr<folly::exception_wrapper> GetRemoteException( + const hbase::pb::NameBytesPair& exc_resp); }; } /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/a93c6a99/hbase-native-client/core/server-request.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h index 7f31c2b..85df9ed 100644 --- a/hbase-native-client/core/server-request.h +++ b/hbase-native-client/core/server-request.h @@ -44,8 +44,14 @@ class ServerRequest { void AddActionsByRegion(std::shared_ptr<RegionLocation> region_location, std::shared_ptr<Action> action) { auto region_name = region_location->region_name(); - auto itr = actions_by_region_.at(region_name); - itr->AddAction(action); + auto search = actions_by_region_.find(region_name); + if (search == actions_by_region_.end()) { + auto region_request = std::make_shared<RegionRequest>(region_location); + actions_by_region_[region_name] = region_request; + actions_by_region_[region_name]->AddAction(action); + } else { + search->second->AddAction(action); + } } const ActionsByRegion &actions_by_region() const { return actions_by_region_; }