Repository: hbase Updated Branches: refs/heads/HBASE-14850 727d9b798 -> 05b59e8d4
HBASE-18578 [C++] Add pause for RPC test Signed-off-by: Enis Soztutar <e...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/05b59e8d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/05b59e8d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/05b59e8d Branch: refs/heads/HBASE-14850 Commit: 05b59e8d4eb819eb1f9f192c7d72e74e39088e16 Parents: 727d9b7 Author: Xiaobing Zhou <xz...@hortonworks.com> Authored: Tue Aug 22 12:01:21 2017 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Aug 22 12:02:39 2017 -0700 ---------------------------------------------------------------------- .../connection/rpc-test-server.cc | 9 ++- hbase-native-client/connection/rpc-test.cc | 61 ++++++++++++++++---- 2 files changed, 59 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/05b59e8d/hbase-native-client/connection/rpc-test-server.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc index 6132fbb..707bca7 100644 --- a/hbase-native-client/connection/rpc-test-server.cc +++ b/hbase-native-client/connection/rpc-test-server.cc @@ -88,7 +88,14 @@ Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Req response->set_exception(folly::make_exception_wrapper<RpcTestException>("server error!")); } else if (method_name == "pause") { - // TODO: + auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); + /* sleeping */ + auto pb_req_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg()); + std::this_thread::sleep_for(std::chrono::milliseconds(pb_req_msg->ms())); + response->set_resp_msg(pb_resp_msg); + VLOG(1) << "RPC server:" + << " pause called, " << pb_req_msg->ms() << " ms"; + } else if (method_name == "addr") { // TODO: } else if (method_name == "socketNotOpen") { http://git-wip-us.apache.org/repos/asf/hbase/blob/05b59e8d/hbase-native-client/connection/rpc-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc index 4688950..d541397 100644 --- a/hbase-native-client/connection/rpc-test.cc +++ b/hbase-native-client/connection/rpc-test.cc @@ -30,6 +30,7 @@ #include <glog/logging.h> #include <gtest/gtest.h> #include <boost/thread.hpp> +#include <chrono> #include "connection/rpc-client.h" #include "exceptions/exception.h" @@ -41,11 +42,14 @@ using namespace wangle; using namespace folly; using namespace hbase; +using namespace std::chrono; DEFINE_int32(port, 0, "test server port"); DEFINE_string(result_format, "RPC {} returned: {}.", "output format of RPC result"); -DEFINE_string(fail_format, "Shouldn't get here, exception is expected for RPC {}.", - "output format of enforcing fail"); +DEFINE_string(fail_ex_format, "Shouldn't get here, exception is expected for RPC {}.", + "output format of enforcing fail with exception"); +DEFINE_string(fail_no_ex_format, "Shouldn't get here, exception is not expected for RPC {}.", + "output format of enforcing fail without exception"); typedef ServerBootstrap<RpcTestServerSerializePipeline> ServerTestBootstrap; typedef std::shared_ptr<ServerTestBootstrap> ServerPtr; @@ -110,8 +114,8 @@ TEST_F(RpcTest, Ping) { VLOG(1) << folly::sformat(FLAGS_result_format, method, ""); }) .onError([&](const folly::exception_wrapper& ew) { - FAIL() << folly::sformat(FLAGS_fail_format, method); - }); + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }).get(); server->stop(); server->join(); @@ -144,8 +148,8 @@ TEST_F(RpcTest, Echo) { EXPECT_EQ(greetings, pb_resp->message()); }) .onError([&](const folly::exception_wrapper& ew) { - FAIL() << folly::sformat(FLAGS_fail_format, method); - }); + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }).get(); server->stop(); server->join(); @@ -168,7 +172,7 @@ TEST_F(RpcTest, Error) { ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), hbase::security::User::defaultUser()) .then([&](std::unique_ptr<Response> response) { - FAIL() << folly::sformat(FLAGS_fail_format, method); + FAIL() << folly::sformat(FLAGS_fail_ex_format, method); }) .onError([&](const folly::exception_wrapper& ew) { VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); @@ -184,7 +188,7 @@ TEST_F(RpcTest, Error) { EXPECT_EQ(kRpcTestException, e.exception_class_name()); EXPECT_EQ(kRpcTestException + ": server error!", e.stack_trace()); })); - }); + }).get(); server->stop(); server->join(); @@ -208,7 +212,7 @@ TEST_F(RpcTest, SocketNotOpen) { ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), hbase::security::User::defaultUser()) .then([&](std::unique_ptr<Response> response) { - FAIL() << folly::sformat(FLAGS_fail_format, method); + FAIL() << folly::sformat(FLAGS_fail_ex_format, method); }) .onError([&](const folly::exception_wrapper& ew) { VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); @@ -231,5 +235,42 @@ TEST_F(RpcTest, SocketNotOpen) { EXPECT_EQ(111 /*ECONNREFUSED*/, ase.getErrno()); }); })); - }); + }).get(); +} + +/** + * test pause + */ +TEST_F(RpcTest, Pause) { + int ms = 500; + + auto conf = CreateConf(); + auto server = CreateRpcServer(); + auto server_addr = GetRpcServerAddress(server); + auto client = + CreateRpcClient(conf, std::chrono::duration_cast<nanoseconds>(milliseconds(2 * ms))); + + auto method = "pause"; + auto request = std::make_unique<Request>(std::make_shared<PauseRequestProto>(), + std::make_shared<EmptyResponseProto>(), method); + auto pb_msg = std::static_pointer_cast<PauseRequestProto>(request->req_msg()); + + pb_msg->set_ms(ms); + + /* sending out request */ + client + ->AsyncCall(server_addr->getAddressStr(), server_addr->getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([&](std::unique_ptr<Response> response) { + auto pb_resp = std::static_pointer_cast<EmptyResponseProto>(response->resp_msg()); + EXPECT_TRUE(pb_resp != nullptr); + VLOG(1) << folly::sformat(FLAGS_result_format, method, ""); + }) + .onError([&](const folly::exception_wrapper& ew) { + VLOG(1) << folly::sformat(FLAGS_result_format, method, ew.what()); + FAIL() << folly::sformat(FLAGS_fail_no_ex_format, method); + }).get(); + + server->stop(); + server->join(); }