http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller-factory.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h deleted file mode 100644 index 188f469..0000000 --- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Logging.h> -#include <folly/io/async/EventBase.h> -#include <chrono> -#include <memory> -#include <string> -#include <vector> - -#include "connection/rpc-client.h" -#include "core/async-batch-rpc-retrying-caller.h" -#include "core/async-rpc-retrying-caller.h" -#include "core/async-scan-rpc-retrying-caller.h" -#include "core/raw-scan-result-consumer.h" -#include "core/region-location.h" -#include "core/row.h" -#include "core/scan-result-cache.h" -#include "core/scan.h" - -#include "if/Client.pb.h" -#include "if/HBase.pb.h" - -namespace hbase { - -class AsyncConnection; - -template <typename RESP> -class SingleRequestCallerBuilder - : public std::enable_shared_from_this<SingleRequestCallerBuilder<RESP>> { - public: - explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn, - std::shared_ptr<folly::HHWheelTimer> retry_timer) - : conn_(conn), - retry_timer_(retry_timer), - table_name_(nullptr), - rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), - pause_(conn->connection_conf()->pause()), - operation_timeout_nanos_(conn->connection_conf()->operation_timeout()), - max_retries_(conn->connection_conf()->max_retries()), - start_log_errors_count_(conn->connection_conf()->start_log_errors_count()), - locate_type_(RegionLocateType::kCurrent) {} - - virtual ~SingleRequestCallerBuilder() = default; - - typedef SingleRequestCallerBuilder<RESP> GenericThisType; - typedef std::shared_ptr<GenericThisType> SharedThisPtr; - - SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) { - table_name_ = table_name; - return shared_this(); - } - - SharedThisPtr rpc_timeout(std::chrono::nanoseconds rpc_timeout_nanos) { - rpc_timeout_nanos_ = rpc_timeout_nanos; - return shared_this(); - } - - SharedThisPtr operation_timeout(std::chrono::nanoseconds operation_timeout_nanos) { - operation_timeout_nanos_ = operation_timeout_nanos; - return shared_this(); - } - - SharedThisPtr pause(std::chrono::nanoseconds pause) { - pause_ = pause; - return shared_this(); - } - - SharedThisPtr max_retries(uint32_t max_retries) { - max_retries_ = max_retries; - return shared_this(); - } - - SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) { - start_log_errors_count_ = start_log_errors_count; - return shared_this(); - } - - SharedThisPtr row(const std::string& row) { - row_ = row; - return shared_this(); - } - - SharedThisPtr locate_type(RegionLocateType locate_type) { - locate_type_ = locate_type; - return shared_this(); - } - - SharedThisPtr action(Callable<RESP> callable) { - callable_ = callable; - return shared_this(); - } - - folly::Future<RESP> Call() { return Build()->Call(); } - - std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<RESP>> Build() { - return std::make_shared<AsyncSingleRequestRpcRetryingCaller<RESP>>( - conn_, retry_timer_, table_name_, row_, locate_type_, callable_, pause_, max_retries_, - operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); - } - - private: - SharedThisPtr shared_this() { - return std::enable_shared_from_this<GenericThisType>::shared_from_this(); - } - - private: - std::shared_ptr<AsyncConnection> conn_; - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<pb::TableName> table_name_; - std::chrono::nanoseconds rpc_timeout_nanos_; - std::chrono::nanoseconds operation_timeout_nanos_; - std::chrono::nanoseconds pause_; - uint32_t max_retries_; - uint32_t start_log_errors_count_; - std::string row_; - RegionLocateType locate_type_; - Callable<RESP> callable_; -}; // end of SingleRequestCallerBuilder - -template <typename REQ, typename RESP> -class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder<REQ, RESP>> { - public: - explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn, - std::shared_ptr<folly::HHWheelTimer> retry_timer) - : conn_(conn), retry_timer_(retry_timer) {} - - virtual ~BatchCallerBuilder() = default; - - typedef std::shared_ptr<BatchCallerBuilder<REQ, RESP>> SharedThisPtr; - - SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) { - table_name_ = table_name; - return shared_this(); - } - - SharedThisPtr actions(std::shared_ptr<std::vector<REQ>> actions) { - actions_ = actions; - return shared_this(); - } - - SharedThisPtr operation_timeout(std::chrono::nanoseconds operation_timeout_nanos) { - operation_timeout_nanos_ = operation_timeout_nanos; - return shared_this(); - } - - SharedThisPtr rpc_timeout(std::chrono::nanoseconds rpc_timeout_nanos) { - rpc_timeout_nanos_ = rpc_timeout_nanos; - return shared_this(); - } - - SharedThisPtr pause(std::chrono::nanoseconds pause_ns) { - pause_ns_ = pause_ns; - return shared_this(); - } - - SharedThisPtr max_attempts(int32_t max_attempts) { - max_attempts_ = max_attempts; - return shared_this(); - } - - SharedThisPtr start_log_errors_count(int32_t start_log_errors_count) { - start_log_errors_count_ = start_log_errors_count; - return shared_this(); - } - - folly::Future<std::vector<folly::Try<RESP>>> Call() { return Build()->Call(); } - - std::shared_ptr<AsyncBatchRpcRetryingCaller<REQ, RESP>> Build() { - return std::make_shared<AsyncBatchRpcRetryingCaller<REQ, RESP>>( - conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_, - operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); - } - - private: - SharedThisPtr shared_this() { - return std::enable_shared_from_this<BatchCallerBuilder>::shared_from_this(); - } - - private: - std::shared_ptr<AsyncConnection> conn_; - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr; - std::shared_ptr<std::vector<REQ>> actions_ = nullptr; - std::chrono::nanoseconds pause_ns_; - int32_t max_attempts_ = 0; - std::chrono::nanoseconds operation_timeout_nanos_; - std::chrono::nanoseconds rpc_timeout_nanos_; - int32_t start_log_errors_count_ = 0; -}; - -class ScanCallerBuilder : public std::enable_shared_from_this<ScanCallerBuilder> { - public: - explicit ScanCallerBuilder(std::shared_ptr<AsyncConnection> conn, - std::shared_ptr<folly::HHWheelTimer> retry_timer) - : conn_(conn), - retry_timer_(retry_timer), - rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), - pause_(conn->connection_conf()->pause()), - scan_timeout_nanos_(conn->connection_conf()->scan_timeout()), - max_retries_(conn->connection_conf()->max_retries()), - start_log_errors_count_(conn->connection_conf()->start_log_errors_count()), - scanner_id_(-1) {} - - virtual ~ScanCallerBuilder() = default; - - typedef ScanCallerBuilder GenericThisType; - typedef std::shared_ptr<ScanCallerBuilder> SharedThisPtr; - - SharedThisPtr rpc_client(std::shared_ptr<hbase::RpcClient> rpc_client) { - rpc_client_ = rpc_client; - return shared_this(); - } - - SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) { - rpc_timeout_nanos_ = rpc_timeout_nanos; - return shared_this(); - } - - SharedThisPtr scan_timeout(nanoseconds scan_timeout_nanos) { - scan_timeout_nanos_ = scan_timeout_nanos; - return shared_this(); - } - - SharedThisPtr scanner_lease_timeout(nanoseconds scanner_lease_timeout_nanos) { - scanner_lease_timeout_nanos_ = scanner_lease_timeout_nanos; - return shared_this(); - } - - SharedThisPtr pause(nanoseconds pause) { - pause_ = pause; - return shared_this(); - } - - SharedThisPtr max_retries(uint32_t max_retries) { - max_retries_ = max_retries; - return shared_this(); - } - - SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) { - start_log_errors_count_ = start_log_errors_count; - return shared_this(); - } - - SharedThisPtr region_location(std::shared_ptr<RegionLocation> region_location) { - region_location_ = region_location; - return shared_this(); - } - - SharedThisPtr scanner_id(int64_t scanner_id) { - scanner_id_ = scanner_id; - return shared_this(); - } - - SharedThisPtr scan(std::shared_ptr<Scan> scan) { - scan_ = scan; - return shared_this(); - } - - SharedThisPtr results_cache(std::shared_ptr<ScanResultCache> results_cache) { - results_cache_ = results_cache; - return shared_this(); - } - - SharedThisPtr consumer(std::shared_ptr<RawScanResultConsumer> consumer) { - consumer_ = consumer; - return shared_this(); - } - - std::shared_ptr<AsyncScanRpcRetryingCaller> Build() { - return std::make_shared<AsyncScanRpcRetryingCaller>( - conn_, retry_timer_, rpc_client_, scan_, scanner_id_, results_cache_, consumer_, - region_location_, scanner_lease_timeout_nanos_, pause_, max_retries_, scan_timeout_nanos_, - rpc_timeout_nanos_, start_log_errors_count_); - } - - private: - SharedThisPtr shared_this() { - return std::enable_shared_from_this<GenericThisType>::shared_from_this(); - } - - private: - std::shared_ptr<AsyncConnection> conn_; - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<hbase::RpcClient> rpc_client_; - std::shared_ptr<Scan> scan_; - nanoseconds rpc_timeout_nanos_; - nanoseconds scan_timeout_nanos_; - nanoseconds scanner_lease_timeout_nanos_; - nanoseconds pause_; - uint32_t max_retries_; - uint32_t start_log_errors_count_; - std::shared_ptr<RegionLocation> region_location_; - int64_t scanner_id_; - std::shared_ptr<RawScanResultConsumer> consumer_; - std::shared_ptr<ScanResultCache> results_cache_; -}; // end of ScanCallerBuilder - -class AsyncRpcRetryingCallerFactory { - private: - std::shared_ptr<AsyncConnection> conn_; - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - - public: - explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn, - std::shared_ptr<folly::HHWheelTimer> retry_timer) - : conn_(conn), retry_timer_(retry_timer) {} - - virtual ~AsyncRpcRetryingCallerFactory() = default; - - template <typename RESP> - std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() { - return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_); - } - - template <typename REQ, typename RESP> - std::shared_ptr<BatchCallerBuilder<REQ, RESP>> Batch() { - return std::make_shared<BatchCallerBuilder<REQ, RESP>>(conn_, retry_timer_); - } - - std::shared_ptr<ScanCallerBuilder> Scan() { - return std::make_shared<ScanCallerBuilder>(conn_, retry_timer_); - } -}; - -} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc deleted file mode 100644 index 8e60991..0000000 --- a/hbase-native-client/core/async-rpc-retrying-caller.cc +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/async-rpc-retrying-caller.h" - -#include <folly/Conv.h> -#include <folly/ExceptionWrapper.h> -#include <folly/Format.h> -#include <folly/Logging.h> -#include <folly/Unit.h> - -#include "connection/rpc-client.h" -#include "core/async-connection.h" -#include "core/hbase-rpc-controller.h" -#include "core/region-location.h" -#include "core/result.h" -#include "exceptions/exception.h" -#include "if/HBase.pb.h" -#include "utils/connection-util.h" -#include "utils/sys-util.h" -#include "utils/time-util.h" - -using folly::exception_wrapper; - -namespace hbase { - -template <typename RESP> -AsyncSingleRequestRpcRetryingCaller<RESP>::AsyncSingleRequestRpcRetryingCaller( - std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row, - RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause, - uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos, - std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) - : conn_(conn), - retry_timer_(retry_timer), - table_name_(table_name), - row_(row), - locate_type_(locate_type), - callable_(callable), - pause_(pause), - max_retries_(max_retries), - operation_timeout_nanos_(operation_timeout_nanos), - rpc_timeout_nanos_(rpc_timeout_nanos), - start_log_errors_count_(start_log_errors_count), - promise_(std::make_shared<folly::Promise<RESP>>()), - tries_(1) { - controller_ = conn_->CreateRpcController(); - start_ns_ = TimeUtil::GetNowNanos(); - max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); - exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>(); -} - -template <typename RESP> -AsyncSingleRequestRpcRetryingCaller<RESP>::~AsyncSingleRequestRpcRetryingCaller() {} - -template <typename RESP> -folly::Future<RESP> AsyncSingleRequestRpcRetryingCaller<RESP>::Call() { - auto f = promise_->getFuture(); - LocateThenCall(); - return f; -} - -template <typename RESP> -void AsyncSingleRequestRpcRetryingCaller<RESP>::LocateThenCall() { - int64_t locate_timeout_ns; - if (operation_timeout_nanos_.count() > 0) { - locate_timeout_ns = RemainingTimeNs(); - if (locate_timeout_ns <= 0) { - CompleteExceptionally(); - return; - } - } else { - locate_timeout_ns = -1L; - } - - conn_->region_locator() - ->LocateRegion(*table_name_, row_, locate_type_, locate_timeout_ns) - .then([this](std::shared_ptr<RegionLocation> loc) { Call(*loc); }) - .onError([this](const exception_wrapper& e) { - OnError(e, - [this, e]() -> std::string { - return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + - table_name_->qualifier() + " failed with e.what()=" + - e.what().toStdString() + ", tries = " + std::to_string(tries_) + - ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + - TimeUtil::ToMillisStr(operation_timeout_nanos_) + " ms, time elapsed = " + - TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; - }, - [](const exception_wrapper& error) {}); - }); -} - -template <typename RESP> -void AsyncSingleRequestRpcRetryingCaller<RESP>::OnError( - const exception_wrapper& error, Supplier<std::string> err_msg, - Consumer<exception_wrapper> update_cached_location) { - ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos()); - exceptions_->push_back(twec); - if (!ExceptionUtil::ShouldRetry(error) || tries_ >= max_retries_) { - CompleteExceptionally(); - return; - } - - if (tries_ > start_log_errors_count_) { - LOG(WARNING) << err_msg(); - } else { - VLOG(1) << err_msg(); - } - - int64_t delay_ns; - if (operation_timeout_nanos_.count() > 0) { - int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; - if (max_delay_ns <= 0) { - CompleteExceptionally(); - return; - } - delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1)); - } else { - delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1); - } - update_cached_location(error); - tries_++; - - /* - * The HHWheelTimer::scheduleTimeout() fails with an assertion from - * EventBase::isInEventBaseThread() if we execute the schedule in a random thread, or one of - * the IOThreadPool threads (with num threads > 1). I think there is a bug there in using retry - * timer from IOThreadPool threads. It only works when executed from a single-thread pool - * (retry_executor() is). However, the scheduled "work" which is the LocateThenCall() should - * still happen in a thread pool, that is why we are submitting the work to the CPUThreadPool. - * IOThreadPool cannot be used without fixing the blocking call that we do at TCP connection - * establishment time (see ConnectionFactory::Connect()), otherwise, the IOThreadPool thread - * just hangs because it deadlocks itself. - */ - conn_->retry_executor()->add([=]() { - retry_timer_->scheduleTimeoutFn( - [=]() { conn_->cpu_executor()->add([&]() { LocateThenCall(); }); }, - std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns))); - }); -} - -template <typename RESP> -void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation& loc) { - int64_t call_timeout_ns; - if (operation_timeout_nanos_.count() > 0) { - call_timeout_ns = this->RemainingTimeNs(); - if (call_timeout_ns <= 0) { - this->CompleteExceptionally(); - return; - } - call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count()); - } else { - call_timeout_ns = rpc_timeout_nanos_.count(); - } - - std::shared_ptr<RpcClient> rpc_client; - - rpc_client = conn_->rpc_client(); - - ResetController(controller_, call_timeout_ns); - - // TODO: RegionLocation should propagate through these method chains as a shared_ptr. - // Otherwise, it may get deleted underneat us. We are just copying for now. - auto loc_ptr = std::make_shared<RegionLocation>(loc); - callable_(controller_, loc_ptr, rpc_client) - .then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) - .onError([&, loc_ptr, this](const exception_wrapper& e) { - OnError( - e, - [&, this, e]() -> std::string { - return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(), - loc_ptr->server_name().port()) + - " for '" + row_ + "' in " + loc_ptr->DebugString() + " of " + - table_name_->namespace_() + "::" + table_name_->qualifier() + - " failed with e.what()=" + e.what().toStdString() + ", tries = " + - std::to_string(tries_) + ", maxAttempts = " + std::to_string(max_attempts_) + - ", timeout = " + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + - " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; - }, - [&, this](const exception_wrapper& error) { - conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error); - }); - }); -} - -template <typename RESP> -void AsyncSingleRequestRpcRetryingCaller<RESP>::CompleteExceptionally() { - this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); -} - -template <typename RESP> -int64_t AsyncSingleRequestRpcRetryingCaller<RESP>::RemainingTimeNs() { - return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); -} - -template <typename RESP> -void AsyncSingleRequestRpcRetryingCaller<RESP>::ResetController( - std::shared_ptr<HBaseRpcController> controller, const int64_t& timeout_ns) { - controller->Reset(); - if (timeout_ns >= 0) { - controller->set_call_timeout(std::chrono::milliseconds( - std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns)))); - } -} - -// explicit instantiations for the linker. Otherwise, you have to #include the .cc file for the -// templetized -// class definitions. -class OpenScannerResponse; -template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>; -template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>; -template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>; -template class AsyncSingleRequestRpcRetryingCaller<bool>; -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-caller.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h deleted file mode 100644 index c7e28d0..0000000 --- a/hbase-native-client/core/async-rpc-retrying-caller.h +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/ExceptionWrapper.h> -#include <folly/futures/Future.h> -#include <folly/io/async/EventBase.h> -#include <folly/io/async/HHWheelTimer.h> - -#include <algorithm> -#include <chrono> -#include <functional> -#include <memory> -#include <string> -#include <type_traits> -#include <utility> -#include <vector> -#include "core/async-connection.h" -#include "core/hbase-rpc-controller.h" -#include "core/region-location.h" -#include "exceptions/exception.h" -#include "if/HBase.pb.h" - -namespace hbase { - -template <typename T> -using Supplier = std::function<T()>; - -template <typename T> -using Consumer = std::function<void(T)>; - -template <typename R, typename S, typename... I> -using ReqConverter = std::function<R(const S&, const I&...)>; - -template <typename R, typename S> -using RespConverter = std::function<R(const S&)>; - -template <typename RESP> -using RpcCallback = std::function<void(const RESP&)>; - -template <typename REQ, typename RESP> -using RpcCall = std::function<folly::Future<std::unique_ptr<RESP>>( - std::shared_ptr<RpcClient>, std::shared_ptr<RegionLocation>, - std::shared_ptr<HBaseRpcController>, std::unique_ptr<REQ>)>; - -template <typename RESP> -using Callable = - std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>, - std::shared_ptr<RegionLocation>, std::shared_ptr<RpcClient>)>; - -template <typename RESP> -class AsyncSingleRequestRpcRetryingCaller { - public: - AsyncSingleRequestRpcRetryingCaller( - std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row, - RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause, - uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos, - std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); - - virtual ~AsyncSingleRequestRpcRetryingCaller(); - - folly::Future<RESP> Call(); - - private: - void LocateThenCall(); - - void OnError(const folly::exception_wrapper& error, Supplier<std::string> err_msg, - Consumer<folly::exception_wrapper> update_cached_location); - - void Call(const RegionLocation& loc); - - void CompleteExceptionally(); - - int64_t RemainingTimeNs(); - - static void ResetController(std::shared_ptr<HBaseRpcController> controller, - const int64_t& timeout_ns); - - private: - std::shared_ptr<AsyncConnection> conn_; - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<hbase::pb::TableName> table_name_; - std::string row_; - RegionLocateType locate_type_; - Callable<RESP> callable_; - std::chrono::nanoseconds pause_; - uint32_t max_retries_; - std::chrono::nanoseconds operation_timeout_nanos_; - std::chrono::nanoseconds rpc_timeout_nanos_; - uint32_t start_log_errors_count_; - std::shared_ptr<folly::Promise<RESP>> promise_; - std::shared_ptr<HBaseRpcController> controller_; - uint64_t start_ns_; - uint32_t tries_; - std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_; - uint32_t max_attempts_; -}; -} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-rpc-retrying-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc deleted file mode 100644 index 2eb82a9..0000000 --- a/hbase-native-client/core/async-rpc-retrying-test.cc +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Logging.h> -#include <folly/Memory.h> -#include <folly/futures/Future.h> -#include <folly/io/async/EventBase.h> -#include <folly/io/async/ScopedEventBaseThread.h> -#include <gmock/gmock.h> -#include <google/protobuf/stubs/callback.h> -#include <wangle/concurrent/IOThreadPoolExecutor.h> - -#include <chrono> -#include <functional> -#include <string> - -#include "connection/request.h" -#include "connection/response.h" -#include "connection/rpc-client.h" -#include "core/async-connection.h" -#include "core/async-rpc-retrying-caller-factory.h" -#include "core/async-rpc-retrying-caller.h" -#include "core/client.h" -#include "core/connection-configuration.h" -#include "core/hbase-rpc-controller.h" -#include "core/keyvalue-codec.h" -#include "core/region-location.h" -#include "core/request-converter.h" -#include "core/response-converter.h" -#include "core/result.h" -#include "exceptions/exception.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" -#include "test-util/test-util.h" -#include "utils/time-util.h" - -using hbase::AsyncRpcRetryingCallerFactory; -using hbase::AsyncConnection; -using hbase::AsyncRegionLocator; -using hbase::ConnectionConfiguration; -using hbase::Configuration; -using hbase::HBaseRpcController; -using hbase::RegionLocation; -using hbase::RegionLocateType; -using hbase::RpcClient; -using hbase::RequestConverter; -using hbase::ResponseConverter; -using hbase::ReqConverter; -using hbase::RespConverter; -using hbase::Put; -using hbase::TimeUtil; -using hbase::Client; -using hbase::security::User; - -using ::testing::Return; -using ::testing::_; -using std::chrono::nanoseconds; -using std::chrono::milliseconds; - -using namespace hbase; - -using folly::exception_wrapper; - -class AsyncRpcRetryTest : public ::testing::Test { - public: - static std::unique_ptr<hbase::TestUtil> test_util; - - static void SetUpTestCase() { - google::InstallFailureSignalHandler(); - test_util = std::make_unique<hbase::TestUtil>(); - test_util->StartMiniCluster(2); - } -}; -std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr; - -class AsyncRegionLocatorBase : public AsyncRegionLocator { - public: - AsyncRegionLocatorBase() {} - explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location) - : region_location_(region_location) {} - virtual ~AsyncRegionLocatorBase() = default; - - folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &, - const std::string &, - const RegionLocateType, - const int64_t) override { - folly::Promise<std::shared_ptr<RegionLocation>> promise; - promise.setValue(region_location_); - return promise.getFuture(); - } - - virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) { - region_location_ = region_location; - } - - void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {} - - protected: - std::shared_ptr<RegionLocation> region_location_; -}; - -class MockAsyncRegionLocator : public AsyncRegionLocatorBase { - public: - MockAsyncRegionLocator() : AsyncRegionLocatorBase() {} - explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) - : AsyncRegionLocatorBase(region_location) {} - virtual ~MockAsyncRegionLocator() {} -}; - -class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase { - private: - uint32_t tries_ = 0; - uint32_t num_fails_ = 0; - - public: - explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails) - : AsyncRegionLocatorBase(), num_fails_(num_fails) {} - explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) - : AsyncRegionLocatorBase(region_location) {} - virtual ~MockWrongRegionAsyncRegionLocator() {} - - folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( - const hbase::pb::TableName &tn, const std::string &row, - const RegionLocateType locate_type = RegionLocateType::kCurrent, - const int64_t locate_ns = 0) override { - // Fail for num_fails_ times, then delegate to the super class which will give the correct - // region location. - if (tries_++ > num_fails_) { - return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); - } - folly::Promise<std::shared_ptr<RegionLocation>> promise; - /* set random region name, simulating invalid region */ - auto result = std::make_shared<RegionLocation>( - "whatever-region-name", region_location_->region_info(), region_location_->server_name()); - promise.setValue(result); - return promise.getFuture(); - } -}; - -class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase { - private: - uint32_t tries_ = 0; - uint32_t num_fails_ = 0; - - public: - explicit MockFailingAsyncRegionLocator(uint32_t num_fails) - : AsyncRegionLocatorBase(), num_fails_(num_fails) {} - explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) - : AsyncRegionLocatorBase(region_location) {} - virtual ~MockFailingAsyncRegionLocator() {} - folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( - const hbase::pb::TableName &tn, const std::string &row, - const RegionLocateType locate_type = RegionLocateType::kCurrent, - const int64_t locate_ns = 0) override { - // Fail for num_fails_ times, then delegate to the super class which will give the correct - // region location. - if (tries_++ > num_fails_) { - return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); - } - folly::Promise<std::shared_ptr<RegionLocation>> promise; - promise.setException(std::runtime_error{"Failed to look up region location"}); - return promise.getFuture(); - } -}; - -class MockAsyncConnection : public AsyncConnection, - public std::enable_shared_from_this<MockAsyncConnection> { - public: - MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf, - std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, - std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor, - std::shared_ptr<RpcClient> rpc_client, - std::shared_ptr<AsyncRegionLocator> region_locator) - : conn_conf_(conn_conf), - retry_timer_(retry_timer), - cpu_executor_(cpu_executor), - io_executor_(io_executor), - retry_executor_(retry_executor), - rpc_client_(rpc_client), - region_locator_(region_locator) {} - ~MockAsyncConnection() {} - void Init() { - caller_factory_ = - std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_); - } - - std::shared_ptr<Configuration> conf() override { return nullptr; } - std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; } - std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override { - return caller_factory_; - } - std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; } - std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; } - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; } - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; } - std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override { - return retry_executor_; - } - - void Close() override {} - std::shared_ptr<HBaseRpcController> CreateRpcController() override { - return std::make_shared<HBaseRpcController>(); - } - - private: - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<ConnectionConfiguration> conn_conf_; - std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_; - std::shared_ptr<RpcClient> rpc_client_; - std::shared_ptr<AsyncRegionLocator> region_locator_; - std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; - std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; - std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_; -}; - -template <typename CONN> -class MockRawAsyncTableImpl { - public: - explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {} - virtual ~MockRawAsyncTableImpl() = default; - - /* implement this in real RawAsyncTableImpl. */ - - /* in real RawAsyncTableImpl, this should be private. */ - folly::Future<std::shared_ptr<hbase::Result>> GetCall( - std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<RegionLocation> loc, const hbase::Get &get) { - hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = []( - std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc, - std::shared_ptr<HBaseRpcController> controller, - std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> { - VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:" - << loc->DebugString(); - return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), - std::move(preq), User::defaultUser(), "ClientService"); - }; - - return Call<hbase::Get, hbase::Request, hbase::Response, std::shared_ptr<hbase::Result>>( - rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call, - &hbase::ResponseConverter::FromGetResponse); - } - - /* in real RawAsyncTableImpl, this should be private. */ - template <typename REQ, typename PREQ, typename PRESP, typename RESP> - folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client, - std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<RegionLocation> loc, const REQ &req, - ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, - hbase::RpcCall<PREQ, PRESP> rpc_call, - RespConverter<RESP, PRESP> resp_converter) { - promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>(); - auto f = promise_->getFuture(); - VLOG(1) << "calling rpc_call"; - rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) - .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) { - VLOG(1) << "MockRawAsyncTableImpl#call succeded: "; - RESP result = resp_converter(*presp); - promise_->setValue(result); - }) - .onError([this](const exception_wrapper &e) { - VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what(); - VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name(); - promise_->setException(e); - }); - return f; - } - - private: - std::shared_ptr<CONN> conn_; - std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_; -}; - -void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName, - uint32_t operation_timeout_millis = 1200000) { - AsyncRpcRetryTest::test_util->CreateTable(tableName, "d"); - - // Create TableName and Row to be fetched from HBase - auto tn = folly::to<hbase::pb::TableName>(tableName); - auto row = "test2"; - - // Get to be performed on above HBase Table - hbase::Get get(row); - - // Create a client - Client client(*(AsyncRpcRetryTest::test_util->conf())); - - // Get connection to HBase Table - auto table = client.Table(tn); - - table->Put(Put{"test2"}.AddColumn("d", "2", "value2")); - table->Put(Put{"test2"}.AddColumn("d", "extra", "value for extra")); - - /* init region location and rpc channel */ - auto region_location = table->GetRegionLocation(row); - - // auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4); - auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4); - auto io_executor_ = client.async_connection()->io_executor(); - auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1); - auto codec = std::make_shared<hbase::KeyValueCodec>(); - auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec, - AsyncRpcRetryTest::test_util->conf()); - // auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true); - std::shared_ptr<folly::HHWheelTimer> retry_timer = - folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); - - /* init connection configuration */ - auto connection_conf = std::make_shared<ConnectionConfiguration>( - TimeUtil::SecondsToNanos(20), // connect_timeout - TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout - TimeUtil::SecondsToNanos(60), // rpc_timeout - TimeUtil::MillisToNanos(100), // pause - 5, // max retries - 9); // start log errors count - - /* set region locator */ - region_locator->set_region_location(region_location); - - /* init hbase client connection */ - auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_, - io_executor_, retry_executor_, rpc_client, - region_locator); - conn->Init(); - - /* init retry caller factory */ - auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn); - - /* init request caller builder */ - auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>(); - - /* call with retry to get result */ - - auto async_caller = - builder->table(std::make_shared<hbase::pb::TableName>(tn)) - ->row(row) - ->rpc_timeout(conn->connection_conf()->read_rpc_timeout()) - ->operation_timeout(conn->connection_conf()->operation_timeout()) - ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) - -> folly::Future<std::shared_ptr<hbase::Result>> { - return tableImpl->GetCall(rpc_client, controller, loc, get); - }) - ->Build(); - - auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>(); - - auto result = async_caller->Call().get(milliseconds(500000)); - - // Test the values, should be same as in put executed on hbase shell - ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; - EXPECT_EQ("test2", result->Row()); - EXPECT_EQ("value2", *(result->Value("d", "2"))); - EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); - - retry_timer->destroy(); - table->Close(); - client.Close(); - retry_executor_->stop(); -} - -// Test successful case -TEST_F(AsyncRpcRetryTest, TestGetBasic) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockAsyncRegionLocator>()); - runTest(region_locator, "table1"); -} - -// Tests the RPC failing 3 times, then succeeding -TEST_F(AsyncRpcRetryTest, TestHandleException) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); - runTest(region_locator, "table2"); -} - -// Tests the RPC failing 5 times, throwing an exception -TEST_F(AsyncRpcRetryTest, TestFailWithException) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockWrongRegionAsyncRegionLocator>(5)); - EXPECT_ANY_THROW(runTest(region_locator, "table3")); -} - -// Tests the region location lookup failing 3 times, then succeeding -TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(3)); - runTest(region_locator, "table4"); -} - -// Tests the region location lookup failing 5 times, throwing an exception -TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(5)); - EXPECT_ANY_THROW(runTest(region_locator, "table5")); -} - -// Tests hitting operation timeout, thus not retrying anymore -TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) { - std::shared_ptr<AsyncRegionLocatorBase> region_locator( - std::make_shared<MockFailingAsyncRegionLocator>(3)); - EXPECT_ANY_THROW(runTest(region_locator, "table6", 200)); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-scan-rpc-retrying-caller.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.cc b/hbase-native-client/core/async-scan-rpc-retrying-caller.cc deleted file mode 100644 index a1e8362..0000000 --- a/hbase-native-client/core/async-scan-rpc-retrying-caller.cc +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/async-scan-rpc-retrying-caller.h" - -namespace hbase { - -ScanResumerImpl::ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller) - : caller_(caller), mutex_() {} - -void ScanResumerImpl::Resume() { - // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we - // just return at the first if condition without loading the resp and numValidResuls field. If - // resume is called after suspend, then it is also safe to just reference resp and - // numValidResults after the synchronized block as no one will change it anymore. - std::shared_ptr<pb::ScanResponse> local_resp; - int64_t local_num_complete_rows; - - { - std::unique_lock<std::mutex> mlock{mutex_}; - if (state_ == ScanResumerState::kInitialized) { - // user calls this method before we call prepare, so just set the state to - // RESUMED, the implementation will just go on. - state_ = ScanResumerState::kResumed; - return; - } - if (state_ == ScanResumerState::kResumed) { - // already resumed, give up. - return; - } - state_ = ScanResumerState::kResumed; - local_resp = resp_; - local_num_complete_rows = num_complete_rows_; - } - - caller_->CompleteOrNext(local_resp); -} - -bool ScanResumerImpl::Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows) { - std::unique_lock<std::mutex> mlock(mutex_); - if (state_ == ScanResumerState::kResumed) { - // user calls resume before we actually suspend the scan, just continue; - return false; - } - state_ = ScanResumerState::kSuspended; - resp_ = resp; - num_complete_rows_ = num_complete_rows; - - return true; -} - -ScanControllerImpl::ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller) - : caller_(caller) {} - -std::shared_ptr<ScanResumer> ScanControllerImpl::Suspend() { - PreCheck(); - state_ = ScanControllerState::kSuspended; - resumer_ = std::make_shared<ScanResumerImpl>(caller_); - return resumer_; -} - -void ScanControllerImpl::Terminate() { - PreCheck(); - state_ = ScanControllerState::kTerminated; -} - -// return the current state, and set the state to DESTROYED. -ScanControllerState ScanControllerImpl::Destroy() { - ScanControllerState state = state_; - state_ = ScanControllerState::kDestroyed; - return state; -} - -void ScanControllerImpl::PreCheck() { - CHECK(std::this_thread::get_id() == caller_thread_id_) - << "The current thread is" << std::this_thread::get_id() << ", expected thread is " - << caller_thread_id_ << ", you should not call this method outside OnNext or OnHeartbeat"; - - CHECK(state_ == ScanControllerState::kInitialized) << "Invalid Stopper state " - << DebugString(state_); -} - -std::string ScanControllerImpl::DebugString(ScanControllerState state) { - switch (state) { - case ScanControllerState::kInitialized: - return "kInitialized"; - case ScanControllerState::kSuspended: - return "kSuspended"; - case ScanControllerState::kTerminated: - return "kTerminated"; - case ScanControllerState::kDestroyed: - return "kDestroyed"; - default: - return "UNKNOWN"; - } -} - -std::string ScanControllerImpl::DebugString(ScanResumerState state) { - switch (state) { - case ScanResumerState::kInitialized: - return "kInitialized"; - case ScanResumerState::kSuspended: - return "kSuspended"; - case ScanResumerState::kResumed: - return "kResumed"; - default: - return "UNKNOWN"; - } -} - -AsyncScanRpcRetryingCaller::AsyncScanRpcRetryingCaller( - std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<Scan> scan, int64_t scanner_id, - std::shared_ptr<ScanResultCache> results_cache, std::shared_ptr<RawScanResultConsumer> consumer, - std::shared_ptr<RegionLocation> region_location, nanoseconds scanner_lease_timeout_nanos, - nanoseconds pause, uint32_t max_retries, nanoseconds scan_timeout_nanos, - nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count) - : conn_(conn), - retry_timer_(retry_timer), - rpc_client_(rpc_client), - scan_(scan), - scanner_id_(scanner_id), - results_cache_(results_cache), - consumer_(consumer), - region_location_(region_location), - scanner_lease_timeout_nanos_(scanner_lease_timeout_nanos), - pause_(pause), - max_retries_(max_retries), - scan_timeout_nanos_(scan_timeout_nanos), - rpc_timeout_nanos_(rpc_timeout_nanos), - start_log_errors_count_(start_log_errors_count), - promise_(std::make_shared<folly::Promise<bool>>()), - tries_(1) { - controller_ = conn_->CreateRpcController(); - start_ns_ = TimeUtil::GetNowNanos(); - max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); - exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>(); -} - -folly::Future<bool> AsyncScanRpcRetryingCaller::Start( - std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<pb::ScanResponse> open_scan_resp, - const std::shared_ptr<CellScanner> cell_scanner) { - OnComplete(controller, open_scan_resp, cell_scanner); - return promise_->getFuture(); -} - -int64_t AsyncScanRpcRetryingCaller::RemainingTimeNs() { - return scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); -} - -void AsyncScanRpcRetryingCaller::OnComplete(std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<pb::ScanResponse> resp, - const std::shared_ptr<CellScanner> cell_scanner) { - VLOG(5) << "Scan: OnComplete, scanner_id:" << scanner_id_; - - if (controller->Failed()) { - OnError(controller->exception()); - return; - } - - bool is_heartbeat = resp->has_heartbeat_message() && resp->heartbeat_message(); - - int64_t num_complete_rows_before = results_cache_->num_complete_rows(); - try { - auto raw_results = ResponseConverter::FromScanResponse(resp, cell_scanner); - - auto results = results_cache_->AddAndGet(raw_results, is_heartbeat); - - auto scan_controller = std::make_shared<ScanControllerImpl>(shared_from_this()); - - if (results.size() > 0) { - UpdateNextStartRowWhenError(*results[results.size() - 1]); - VLOG(5) << "Calling consumer->OnNext()"; - consumer_->OnNext(results, scan_controller); - } else if (is_heartbeat) { - consumer_->OnHeartbeat(scan_controller); - } - - ScanControllerState state = scan_controller->Destroy(); - if (state == ScanControllerState::kTerminated) { - if (resp->has_more_results_in_region() && !resp->more_results_in_region()) { - // we have more results in region but user request to stop the scan, so we need to close the - // scanner explicitly. - CloseScanner(); - } - CompleteNoMoreResults(); - return; - } - - int64_t num_complete_rows = results_cache_->num_complete_rows() - num_complete_rows_before; - if (state == ScanControllerState::kSuspended) { - if (scan_controller->resumer()->Prepare(resp, num_complete_rows)) { - return; - } - } - } catch (const std::runtime_error& e) { - // We can not retry here. The server has responded normally and the call sequence has been - // increased so a new scan with the same call sequence will cause an - // OutOfOrderScannerNextException. Let the upper layer open a new scanner. - LOG(WARNING) << "Received exception in reading the scan response:" << e.what(); - CompleteWhenError(true); - return; - } - - CompleteOrNext(resp); -} - -void AsyncScanRpcRetryingCaller::CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp) { - VLOG(5) << "Scan: CompleteOrNext, scanner_id" << scanner_id_ - << ", response:" << resp->ShortDebugString(); - - if (resp->has_more_results() && !resp->more_results()) { - // RS tells us there is no more data for the whole scan - CompleteNoMoreResults(); - return; - } - // TODO: Implement Scan::limit(), and check the limit here - - if (resp->has_more_results_in_region() && !resp->more_results_in_region()) { - // TODO: check whether Scan is reversed here - CompleteWhenNoMoreResultsInRegion(); - return; - } - Next(); -} - -void AsyncScanRpcRetryingCaller::CompleteExceptionally(bool close_scanner) { - VLOG(5) << "Scan: CompleteExceptionally"; - results_cache_->Clear(); - if (close_scanner) { - CloseScanner(); - } - this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); -} - -void AsyncScanRpcRetryingCaller::CompleteNoMoreResults() { - // In master code, scanners auto-close if we have exhausted the region. It may not be the case - // in branch-1 code. If this is backported, make sure that the scanner is closed. - VLOG(5) << "Scan: CompleteNoMoreResults, scanner_id:" << scanner_id_; - promise_->setValue(false); -} - -void AsyncScanRpcRetryingCaller::CompleteWhenNoMoreResultsInRegion() { - VLOG(5) << "Scan: CompleteWhenNoMoreResultsInRegion, scanner_id:" << scanner_id_; - // In master code, scanners auto-close if we have exhausted the region. It may not be the case - // in branch-1 code. If this is backported, make sure that the scanner is closed. - if (NoMoreResultsForScan(*scan_, region_location_->region_info())) { - CompleteNoMoreResults(); - } else { - CompleteWithNextStartRow(region_location_->region_info().end_key(), true); - } -} - -void AsyncScanRpcRetryingCaller::CompleteWithNextStartRow(std::string row, bool inclusive) { - VLOG(5) << "Scan: CompleteWithNextStartRow: region scan is complete, move to next region"; - scan_->SetStartRow(row); - // TODO: set inclusive if it is reverse scans - promise_->setValue(true); -} - -void AsyncScanRpcRetryingCaller::UpdateNextStartRowWhenError(const Result& result) { - next_start_row_when_error_ = optional<std::string>(result.Row()); - include_next_start_row_when_error_ = result.Partial(); -} - -void AsyncScanRpcRetryingCaller::CompleteWhenError(bool close_scanner) { - VLOG(5) << "Scan: CompleteWhenError, scanner_id:" << scanner_id_; - results_cache_->Clear(); - if (close_scanner) { - CloseScanner(); - } - if (next_start_row_when_error_) { - // TODO: HBASE-17583 adds include start / stop row to the Scan. Once we rebase and implement - // those options in Scan , we can start using that here. - scan_->SetStartRow(include_next_start_row_when_error_ - ? *next_start_row_when_error_ - : BytesUtil::CreateClosestRowAfter(*next_start_row_when_error_)); - } - promise_->setValue(true); -} - -void AsyncScanRpcRetryingCaller::OnError(const folly::exception_wrapper& error) { - VLOG(5) << "Scan: OnError, scanner_id:" << scanner_id_; - if (tries_ > start_log_errors_count_ || VLOG_IS_ON(5)) { - LOG(WARNING) << "Call to " << region_location_->server_name().ShortDebugString() - << " for scanner id = " << scanner_id_ << " for " - << region_location_->region_info().ShortDebugString() - << " failed, , tries = " << tries_ << ", maxAttempts = " << max_attempts_ - << ", timeout = " << TimeUtil::ToMillis(scan_timeout_nanos_).count() - << " ms, time elapsed = " << TimeUtil::ElapsedMillis(start_ns_) << " ms" - << error.what().toStdString(); - } - - bool scanner_closed = ExceptionUtil::IsScannerClosed(error); - ThrowableWithExtraContext twec(error, TimeUtil::GetNowNanos()); - exceptions_->push_back(twec); - if (tries_ >= max_retries_) { - CompleteExceptionally(!scanner_closed); - return; - } - - int64_t delay_ns; - if (scan_timeout_nanos_.count() > 0) { - int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; - if (max_delay_ns <= 0) { - CompleteExceptionally(!scanner_closed); - return; - } - delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1)); - } else { - delay_ns = ConnectionUtils::GetPauseTime(pause_.count(), tries_ - 1); - } - - if (scanner_closed) { - CompleteWhenError(false); - return; - } - - if (ExceptionUtil::IsScannerOutOfOrder(error)) { - CompleteWhenError(true); - return; - } - if (!ExceptionUtil::ShouldRetry(error)) { - CompleteExceptionally(true); - return; - } - tries_++; - - auto self(shared_from_this()); - conn_->retry_executor()->add([&]() { - retry_timer_->scheduleTimeoutFn( - [self]() { self->conn_->cpu_executor()->add([&]() { self->Call(); }); }, - std::chrono::milliseconds(TimeUtil::ToMillis(delay_ns))); - }); -} - -bool AsyncScanRpcRetryingCaller::NoMoreResultsForScan(const Scan& scan, - const pb::RegionInfo& info) { - if (BytesUtil::IsEmptyStopRow(info.end_key())) { - return true; - } - if (BytesUtil::IsEmptyStopRow(scan.StopRow())) { - return false; - } - int32_t c = BytesUtil::CompareTo(info.end_key(), scan.StopRow()); - // 1. if our stop row is less than the endKey of the region - // 2. if our stop row is equal to the endKey of the region and we do not include the stop row - // for scan. - return c > 0 || - (c == 0 /* && !scan.IncludeStopRow()*/); // TODO: Scans always exclude StopRow for now. -} - -void AsyncScanRpcRetryingCaller::Next() { - VLOG(5) << "Scan: Next"; - next_call_seq_++; - tries_ = 1; - exceptions_->clear(); - start_ns_ = TimeUtil::GetNowNanos(); - Call(); -} - -void AsyncScanRpcRetryingCaller::Call() { - VLOG(5) << "Scan: Call"; - auto self(shared_from_this()); - // As we have a call sequence for scan, it is useless to have a different rpc timeout which is - // less than the scan timeout. If the server does not respond in time(usually this will not - // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when - // resending the next request and the only way to fix this is to close the scanner and open a - // new one. - int64_t call_timeout_nanos; - if (scan_timeout_nanos_.count() > 0) { - int64_t remaining_nanos = scan_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); - if (remaining_nanos <= 0) { - CompleteExceptionally(true); - return; - } - call_timeout_nanos = remaining_nanos; - } else { - call_timeout_nanos = 0L; - } - - ResetController(controller_, call_timeout_nanos); - - auto req = - RequestConverter::ToScanRequest(scanner_id_, scan_->Caching(), false, next_call_seq_, false); - - // do the RPC call - rpc_client_ - ->AsyncCall(region_location_->server_name().host_name(), - region_location_->server_name().port(), std::move(req), - security::User::defaultUser(), "ClientService") - .via(conn_->cpu_executor().get()) - .then([self, this](const std::unique_ptr<Response>& resp) { - auto scan_resp = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg()); - return OnComplete(controller_, scan_resp, resp->cell_scanner()); - }) - .onError([self, this](const folly::exception_wrapper& e) { OnError(e); }); -} - -void AsyncScanRpcRetryingCaller::CloseScanner() { - auto self(shared_from_this()); - ResetController(controller_, rpc_timeout_nanos_.count()); - - VLOG(5) << "Closing scanner with scanner_id:" << folly::to<std::string>(scanner_id_); - - // Do a close scanner RPC. Fire and forget. - auto req = RequestConverter::ToScanRequest(scanner_id_, 0, true); - rpc_client_ - ->AsyncCall(region_location_->server_name().host_name(), - region_location_->server_name().port(), std::move(req), - security::User::defaultUser(), "ClientService") - .onError([self, this](const folly::exception_wrapper& e) -> std::unique_ptr<Response> { - LOG(WARNING) << "Call to " + region_location_->server_name().ShortDebugString() + - " for closing scanner_id = " + folly::to<std::string>(scanner_id_) + - " for " + region_location_->region_info().ShortDebugString() + - " failed, ignore, probably already closed. Exception:" + - e.what().toStdString(); - return nullptr; - }); -} - -void AsyncScanRpcRetryingCaller::ResetController(std::shared_ptr<HBaseRpcController> controller, - const int64_t& timeout_nanos) { - controller->Reset(); - if (timeout_nanos >= 0) { - controller->set_call_timeout( - milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_nanos)))); - } -} - -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-scan-rpc-retrying-caller.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-scan-rpc-retrying-caller.h b/hbase-native-client/core/async-scan-rpc-retrying-caller.h deleted file mode 100644 index 9555e80..0000000 --- a/hbase-native-client/core/async-scan-rpc-retrying-caller.h +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Conv.h> -#include <folly/ExceptionWrapper.h> -#include <folly/Format.h> -#include <folly/Logging.h> -#include <folly/futures/Future.h> -#include <folly/io/async/EventBase.h> -#include <folly/io/async/HHWheelTimer.h> - -#include <algorithm> -#include <chrono> -#include <functional> -#include <memory> -#include <string> -#include <type_traits> -#include <utility> -#include <vector> - -#include "connection/rpc-client.h" -#include "core/async-connection.h" -#include "core/hbase-rpc-controller.h" -#include "core/raw-scan-result-consumer.h" -#include "core/region-location.h" -#include "core/request-converter.h" -#include "core/response-converter.h" -#include "core/result.h" -#include "core/scan-result-cache.h" -#include "core/scan.h" -#include "exceptions/exception.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" -#include "utils/bytes-util.h" -#include "utils/connection-util.h" -#include "utils/optional.h" -#include "utils/sys-util.h" -#include "utils/time-util.h" - -using std::chrono::nanoseconds; -using std::chrono::milliseconds; - -namespace hbase { - -class AsyncScanRpcRetryingCaller; - -// The resume method is allowed to be called in another thread so here we also use the -// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back -// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, -// and when user calls resume method, we will change the state to RESUMED. But the resume method -// could be called in other thread, and in fact, user could just do this: -// controller.suspend().resume() -// This is strange but valid. This means the scan could be resumed before we call the prepare -// method to do the actual suspend work. So in the resume method, we will check if the state is -// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare -// method, if the state is RESUMED already, we will just return an let the scan go on. -// Notice that, the public methods of this class is supposed to be called by upper layer only, and -// package private methods can only be called within the implementation of -// AsyncScanSingleRegionRpcRetryingCaller. -// TODO: Unlike the Java counter part, we do not do scan lease renewals in a background thread. -// Since there is also no async scan API exposed to the users, only ScanResultConsumer is the -// AsyncTableResultScanner which will only pause the scanner if the result cache is maxed. The -// application is expected to consume the scan results before the scanner lease timeout. -class ScanResumerImpl : public ScanResumer { - public: - explicit ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller); - - virtual ~ScanResumerImpl() = default; - - /** - * Resume the scan. You are free to call it multiple time but only the first call will take - * effect. - */ - void Resume() override; - - // return false if the scan has already been resumed. See the comment above for ScanResumerImpl - // for more details. - bool Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows); - - private: - // INITIALIZED -> SUSPENDED -> RESUMED - // INITIALIZED -> RESUMED - ScanResumerState state_ = ScanResumerState::kInitialized; - std::mutex mutex_; - std::shared_ptr<pb::ScanResponse> resp_ = nullptr; - int64_t num_complete_rows_ = 0; - std::shared_ptr<AsyncScanRpcRetryingCaller> caller_; -}; - -class ScanControllerImpl : public ScanController { - public: - virtual ~ScanControllerImpl() = default; - - explicit ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller); - - /** - * Suspend the scan. - * <p> - * This means we will stop fetching data in background, i.e., will not call onNext any more - * before you resume the scan. - * @return A resumer used to resume the scan later. - */ - std::shared_ptr<ScanResumer> Suspend(); - - /** - * Terminate the scan. - * <p> - * This is useful when you have got enough results and want to stop the scan in onNext method, - * or you want to stop the scan in onHeartbeat method because it has spent too many time. - */ - void Terminate(); - - // return the current state, and set the state to DESTROYED. - ScanControllerState Destroy(); - - std::shared_ptr<ScanResumerImpl> resumer() { return resumer_; } - - private: - void PreCheck(); - - std::string DebugString(ScanControllerState state); - - std::string DebugString(ScanResumerState state); - - private: - // Make sure the methods are only called in this thread. - std::thread::id caller_thread_id_ = std::this_thread::get_id(); - // INITIALIZED -> SUSPENDED -> DESTROYED - // INITIALIZED -> TERMINATED -> DESTROYED - // INITIALIZED -> DESTROYED - // If the state is incorrect we will throw IllegalStateException. - ScanControllerState state_ = ScanControllerState::kInitialized; - std::shared_ptr<ScanResumerImpl> resumer_ = nullptr; - std::shared_ptr<AsyncScanRpcRetryingCaller> caller_; -}; - -class AsyncScanRpcRetryingCaller : public std::enable_shared_from_this<AsyncScanRpcRetryingCaller> { - public: - AsyncScanRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn, - std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<hbase::RpcClient> rpc_client, - std::shared_ptr<Scan> scan, int64_t scanner_id, - std::shared_ptr<ScanResultCache> results_cache, - std::shared_ptr<RawScanResultConsumer> consumer, - std::shared_ptr<RegionLocation> region_location, - nanoseconds scanner_lease_timeout_nanos, nanoseconds pause, - uint32_t max_retries, nanoseconds scan_timeout_nanos, - nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); - - folly::Future<bool> Start(std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<pb::ScanResponse> open_scan_resp, - const std::shared_ptr<CellScanner> cell_scanner); - - private: - int64_t RemainingTimeNs(); - void OnComplete(std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<pb::ScanResponse> resp, - const std::shared_ptr<CellScanner> cell_scanner); - - void CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp); - - void CompleteExceptionally(bool close_scanner); - - void CompleteNoMoreResults(); - - void CompleteWhenNoMoreResultsInRegion(); - - void CompleteWithNextStartRow(std::string row, bool inclusive); - - void UpdateNextStartRowWhenError(const Result& result); - - void CompleteWhenError(bool close_scanner); - - void OnError(const folly::exception_wrapper& e); - - bool NoMoreResultsForScan(const Scan& scan, const pb::RegionInfo& info); - - void Next(); - - void Call(); - - void CloseScanner(); - - void ResetController(std::shared_ptr<HBaseRpcController> controller, - const int64_t& timeout_nanos); - - private: - std::shared_ptr<AsyncConnection> conn_; - std::shared_ptr<folly::HHWheelTimer> retry_timer_; - std::shared_ptr<hbase::RpcClient> rpc_client_; - std::shared_ptr<Scan> scan_; - int64_t scanner_id_; - std::shared_ptr<ScanResultCache> results_cache_; - std::shared_ptr<RawScanResultConsumer> consumer_; - std::shared_ptr<RegionLocation> region_location_; - nanoseconds scanner_lease_timeout_nanos_; - nanoseconds pause_; - uint32_t max_retries_; - nanoseconds scan_timeout_nanos_; - nanoseconds rpc_timeout_nanos_; - uint32_t start_log_errors_count_; - std::shared_ptr<folly::Promise<bool>> promise_; - std::shared_ptr<HBaseRpcController> controller_; - optional<std::string> next_start_row_when_error_ = optional<std::string>(); - bool include_next_start_row_when_error_ = true; - uint64_t start_ns_; - uint32_t tries_; - std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_; - uint32_t max_attempts_; - int64_t next_call_seq_ = -1L; - - friend class ScanResumerImpl; - friend class ScanControllerImpl; -}; - -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-table-result-scanner.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-table-result-scanner.cc b/hbase-native-client/core/async-table-result-scanner.cc deleted file mode 100644 index b1935ae..0000000 --- a/hbase-native-client/core/async-table-result-scanner.cc +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/async-table-result-scanner.h" - -#include <vector> - -namespace hbase { -AsyncTableResultScanner::AsyncTableResultScanner(int64_t max_cache_size) - : max_cache_size_(max_cache_size) { - closed_ = false; - cache_size_ = 0; -} - -AsyncTableResultScanner::~AsyncTableResultScanner() { Close(); } - -void AsyncTableResultScanner::Close() { - std::unique_lock<std::mutex> mlock(mutex_); - closed_ = true; - while (!queue_.empty()) { - queue_.pop(); - } - cache_size_ = 0; - if (resumer_ != nullptr) { - resumer_->Resume(); - } - cond_.notify_all(); -} - -std::shared_ptr<Result> AsyncTableResultScanner::Next() { - VLOG(5) << "AsyncTableResultScanner: Next()"; - - std::shared_ptr<Result> result = nullptr; - std::shared_ptr<ScanResumer> local_resumer = nullptr; - { - std::unique_lock<std::mutex> mlock(mutex_); - while (queue_.empty()) { - if (closed_) { - return nullptr; - } - if (error_) { - throw error_; - } - cond_.wait(mlock); - } - result = queue_.front(); - queue_.pop(); - - cache_size_ -= EstimatedSizeWithSharedPtr(result); - if (resumer_ != nullptr && cache_size_ <= max_cache_size_ / 2) { - VLOG(1) << std::this_thread::get_id() << " resume scan prefetching"; - local_resumer = resumer_; - resumer_ = nullptr; - } - } - - // Need to call ScanResumer::Resume() outside of the scope of the mutex. The reason is that - // folly/wangle event loop might end up running the attached logic(.then()) at the Scan RPC - // in the same event thread before returning from the previous call. This seems like the - // wrong thing to do(â¢), but we cannot fix that now. Since the call back can end up calling - // this::OnNext(), we should unlock the mutex. - if (local_resumer != nullptr) { - local_resumer->Resume(); - } - return result; -} - -void AsyncTableResultScanner::AddToCache(const std::vector<std::shared_ptr<Result>> &results) { - VLOG(5) << "AsyncTableResultScanner: AddToCache()"; - for (const auto r : results) { - queue_.push(r); - cache_size_ += EstimatedSizeWithSharedPtr(r); - } -} - -template <typename T> -inline size_t AsyncTableResultScanner::EstimatedSizeWithSharedPtr(std::shared_ptr<T> t) { - return t->EstimatedSize() + sizeof(std::shared_ptr<T>); -} - -void AsyncTableResultScanner::OnNext(const std::vector<std::shared_ptr<Result>> &results, - std::shared_ptr<ScanController> controller) { - VLOG(5) << "AsyncTableResultScanner: OnNext()"; - { - std::unique_lock<std::mutex> mlock(mutex_); - if (closed_) { - controller->Terminate(); - return; - } - AddToCache(results); - - if (cache_size_ >= max_cache_size_) { - StopPrefetch(controller); - } - } - cond_.notify_all(); -} - -void AsyncTableResultScanner::StopPrefetch(std::shared_ptr<ScanController> controller) { - VLOG(1) << std::this_thread::get_id() - << ": stop prefetching when scanning as the cache size " + - folly::to<std::string>(cache_size_) + " is greater than the max_cache_size " + - folly::to<std::string>(max_cache_size_); - - resumer_ = controller->Suspend(); - num_prefetch_stopped_++; -} - -/** - * Indicate that there is an heartbeat message but we have not cumulated enough cells to call - * onNext. - * <p> - * This method give you a chance to terminate a slow scan operation. - * @param controller used to suspend or terminate the scan. Notice that the {@code controller} - * instance is only valid within the scope of onHeartbeat method. You can only call its - * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. - */ -void AsyncTableResultScanner::OnHeartbeat(std::shared_ptr<ScanController> controller) { - std::unique_lock<std::mutex> mlock(mutex_); - if (closed_) { - controller->Terminate(); - } -} - -/** - * Indicate that we hit an unrecoverable error and the scan operation is terminated. - * <p> - * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. - */ -void AsyncTableResultScanner::OnError(const folly::exception_wrapper &error) { - LOG(WARNING) << "Scanner received error" << error.what(); - std::unique_lock<std::mutex> mlock(mutex_); - error_ = error; - cond_.notify_all(); -} - -/** - * Indicate that the scan operation is completed normally. - */ -void AsyncTableResultScanner::OnComplete() { - std::unique_lock<std::mutex> mlock(mutex_); - closed_ = true; - cond_.notify_all(); -} -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/async-table-result-scanner.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-table-result-scanner.h b/hbase-native-client/core/async-table-result-scanner.h deleted file mode 100644 index dcdf871..0000000 --- a/hbase-native-client/core/async-table-result-scanner.h +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Conv.h> -#include <folly/ExceptionWrapper.h> -#include <folly/Logging.h> -#include <chrono> -#include <condition_variable> -#include <memory> -#include <mutex> -#include <queue> -#include <string> -#include <vector> - -#include "core/raw-scan-result-consumer.h" -#include "core/result-scanner.h" -#include "core/result.h" -#include "if/Client.pb.h" -#include "if/HBase.pb.h" - -namespace hbase { - -class AsyncTableResultScanner : public ResultScanner, public RawScanResultConsumer { - public: - explicit AsyncTableResultScanner(int64_t max_cache_size); - - virtual ~AsyncTableResultScanner(); - - void Close() override; - - std::shared_ptr<Result> Next() override; - - void OnNext(const std::vector<std::shared_ptr<Result>> &results, - std::shared_ptr<ScanController> controller) override; - - /** - * Indicate that there is an heartbeat message but we have not cumulated enough cells to call - * onNext. - * <p> - * This method give you a chance to terminate a slow scan operation. - * @param controller used to suspend or terminate the scan. Notice that the {@code controller} - * instance is only valid within the scope of onHeartbeat method. You can only call its - * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. - */ - void OnHeartbeat(std::shared_ptr<ScanController> controller) override; - - /** - * Indicate that we hit an unrecoverable error and the scan operation is terminated. - * <p> - * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. - */ - void OnError(const folly::exception_wrapper &error) override; - - /** - * Indicate that the scan operation is completed normally. - */ - void OnComplete() override; - - // For testing - uint32_t num_prefetch_stopped() { return num_prefetch_stopped_; } - - private: - void AddToCache(const std::vector<std::shared_ptr<Result>> &results); - - template <typename T> - inline size_t EstimatedSizeWithSharedPtr(std::shared_ptr<T> t); - - void StopPrefetch(std::shared_ptr<ScanController> controller); - - private: - std::queue<std::shared_ptr<Result>> queue_; - std::mutex mutex_; - std::condition_variable cond_; - folly::exception_wrapper error_; - int64_t cache_size_; - int64_t max_cache_size_; - bool closed_; - std::shared_ptr<ScanResumer> resumer_ = nullptr; - uint32_t num_prefetch_stopped_ = 0; -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/core/cell-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/cell-test.cc b/hbase-native-client/core/cell-test.cc deleted file mode 100644 index 4611473..0000000 --- a/hbase-native-client/core/cell-test.cc +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/cell.h" - -#include <glog/logging.h> -#include <gtest/gtest.h> -#include <memory> - -using hbase::Cell; -using hbase::CellType; - -TEST(CellTest, Constructor) { - std::string row = "row-value"; - std::string family = "family-value"; - std::string column = "column-value"; - std::string value = "value-value"; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - CellType cell_type = CellType::PUT; - - Cell cell{row, family, column, timestamp, value, cell_type}; - - EXPECT_EQ(row, cell.Row()); - EXPECT_EQ(family, cell.Family()); - EXPECT_EQ(column, cell.Qualifier()); - EXPECT_EQ(value, cell.Value()); - EXPECT_EQ(timestamp, cell.Timestamp()); - EXPECT_EQ(cell_type, cell.Type()); -} - -TEST(CellTest, CopyConstructor) { - std::string row = "row-value"; - std::string family = "family-value"; - std::string column = "column-value"; - std::string value = "value-value"; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - CellType cell_type = CellType::PUT; - - auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); - Cell cell2{*cell}; - cell = nullptr; - - EXPECT_EQ(row, cell2.Row()); - EXPECT_EQ(family, cell2.Family()); - EXPECT_EQ(column, cell2.Qualifier()); - EXPECT_EQ(value, cell2.Value()); - EXPECT_EQ(timestamp, cell2.Timestamp()); - EXPECT_EQ(cell_type, cell2.Type()); -} - -TEST(CellTest, CopyAssignment) { - std::string row = "row-value"; - std::string family = "family-value"; - std::string column = "column-value"; - std::string value = "value-value"; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - CellType cell_type = CellType::PUT; - - auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); - Cell cell2 = *cell; - cell = nullptr; - - EXPECT_EQ(row, cell2.Row()); - EXPECT_EQ(family, cell2.Family()); - EXPECT_EQ(column, cell2.Qualifier()); - EXPECT_EQ(value, cell2.Value()); - EXPECT_EQ(timestamp, cell2.Timestamp()); - EXPECT_EQ(cell_type, cell2.Type()); -} - -TEST(CellTest, CellRowTest) { - std::string row = "only-row"; - std::string family = "D"; - std::string column = ""; - std::string value = ""; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - CellType cell_type = CellType::PUT; - Cell cell{row, family, column, timestamp, value, cell_type}; - - EXPECT_EQ(row, cell.Row()); - EXPECT_EQ(family, cell.Family()); - EXPECT_EQ(column, cell.Qualifier()); - EXPECT_EQ(value, cell.Value()); - EXPECT_EQ(timestamp, cell.Timestamp()); - EXPECT_EQ(cell_type, cell.Type()); -} - -TEST(CellTest, CellRowFamilyTest) { - std::string row = "only-row"; - std::string family = "only-family"; - std::string column = ""; - std::string value = ""; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - CellType cell_type = CellType::PUT; - Cell cell{row, family, column, timestamp, value, cell_type}; - - EXPECT_EQ(row, cell.Row()); - EXPECT_EQ(family, cell.Family()); - EXPECT_EQ(column, cell.Qualifier()); - EXPECT_EQ(value, cell.Value()); - EXPECT_EQ(timestamp, cell.Timestamp()); - EXPECT_EQ(cell_type, cell.Type()); -} - -TEST(CellTest, CellRowFamilyValueTest) { - std::string row = "only-row"; - std::string family = "only-family"; - std::string column = ""; - std::string value = "only-value"; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - CellType cell_type = CellType::PUT; - - Cell cell{row, family, column, timestamp, value, cell_type}; - - EXPECT_EQ(row, cell.Row()); - EXPECT_EQ(family, cell.Family()); - EXPECT_EQ(column, cell.Qualifier()); - EXPECT_EQ(value, cell.Value()); - EXPECT_EQ(timestamp, cell.Timestamp()); - EXPECT_EQ(cell_type, cell.Type()); -} - -TEST(CellTest, CellRowFamilyColumnValueTest) { - std::string row = "only-row"; - std::string family = "only-family"; - std::string column = "only-column"; - std::string value = "only-value"; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - CellType cell_type = CellType::PUT; - Cell cell{row, family, column, timestamp, value, cell_type}; - - EXPECT_EQ(row, cell.Row()); - EXPECT_EQ(family, cell.Family()); - EXPECT_EQ(column, cell.Qualifier()); - EXPECT_EQ(value, cell.Value()); - EXPECT_EQ(timestamp, cell.Timestamp()); - EXPECT_EQ(cell_type, cell.Type()); -} - -TEST(CellTest, CellDebugString) { - CellType cell_type = CellType::PUT; - std::string row = "row"; - std::string family = "family"; - std::string column = "column"; - std::string value = "value"; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - - Cell cell{row, family, column, timestamp, value, cell_type}; - LOG(INFO) << cell.DebugString(); - EXPECT_EQ("row/family:column/LATEST_TIMESTAMP/PUT/vlen=5/seqid=0", cell.DebugString()); - - Cell cell2{row, "", column, 42, value, CellType::DELETE}; - LOG(INFO) << cell2.DebugString(); - EXPECT_EQ("row/column/42/DELETE/vlen=5/seqid=0", cell2.DebugString()); -} - -TEST(CellTest, CellEstimatedSize) { - CellType cell_type = CellType::PUT; - int64_t timestamp = std::numeric_limits<int64_t>::max(); - - Cell empty{"a", "a", "", timestamp, "", cell_type}; - Cell cell1{"aa", "a", "", timestamp, "", cell_type}; - Cell cell2{"a", "aa", "", timestamp, "", cell_type}; - Cell cell3{"a", "a", "a", timestamp, "", cell_type}; - Cell cell4{"a", "a", "", timestamp, "a", cell_type}; - Cell cell5{"a", "a", "", timestamp, "a", CellType::DELETE}; - Cell cell6{"aaaaaa", "a", "", timestamp, "a", cell_type}; - - LOG(INFO) << empty.EstimatedSize(); - LOG(INFO) << cell1.EstimatedSize(); - - EXPECT_TRUE(empty.EstimatedSize() > sizeof(Cell)); - EXPECT_TRUE(cell1.EstimatedSize() > empty.EstimatedSize()); - EXPECT_EQ(cell1.EstimatedSize(), cell2.EstimatedSize()); - EXPECT_EQ(cell2.EstimatedSize(), cell3.EstimatedSize()); - EXPECT_EQ(cell3.EstimatedSize(), cell4.EstimatedSize()); - EXPECT_EQ(cell4.EstimatedSize(), cell5.EstimatedSize()); - EXPECT_TRUE(cell6.EstimatedSize() > cell1.EstimatedSize()); -}