http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h index 8e579a2..1dd43af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h @@ -26,6 +26,7 @@ #include "common/logging.h" #include "common/util.h" #include "common/libhdfs_events_impl.h" +#include "hdfspp/ioservice.h" #include <asio/connect.hpp> #include <asio/read.hpp> @@ -76,8 +77,8 @@ template <class Socket> RpcConnectionImpl<Socket>::RpcConnectionImpl(std::shared_ptr<RpcEngine> engine) : RpcConnection(engine), options_(engine->options()), - socket_(engine->io_service()), - connect_timer_(engine->io_service()) + socket_(engine->io_service()->GetRaw()), + connect_timer_(engine->io_service()->GetRaw()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } @@ -353,7 +354,7 @@ void RpcConnectionImpl<Socket>::FlushPendingRequests() { OnSendCompleted(ec, size); }); } else { // Nothing to send for this request, inform the handler immediately - ::asio::io_service *service = GetIoService(); + std::shared_ptr<IoService> service = GetIoService(); if(!service) { LOG_ERROR(kRPC, << "RpcConnectionImpl@" << this << " attempted to access null IoService"); // No easy way to bail out of this context, but the only way to get here is when @@ -361,7 +362,7 @@ void RpcConnectionImpl<Socket>::FlushPendingRequests() { return; } - service->post( + service->PostTask( // Never hold locks when calling a callback [req]() { req->OnResponseArrived(nullptr, Status::OK()); } );
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 0ca7c6a..ad6c9b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -30,7 +30,7 @@ template <class T> using optional = std::experimental::optional<T>; -RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, +RpcEngine::RpcEngine(std::shared_ptr<IoService> io_service, const Options &options, const std::string &client_name, const std::string &user_name, const char *protocol_name, int protocol_version) : io_service_(io_service), @@ -40,7 +40,7 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, protocol_name_(protocol_name), protocol_version_(protocol_version), call_id_(0), - retry_timer(*io_service), + retry_timer(io_service->GetRaw()), event_handlers_(std::make_shared<LibhdfsEvents>()), connect_canceled_(false) { @@ -86,7 +86,7 @@ bool RpcEngine::CancelPendingConnect() { void RpcEngine::Shutdown() { LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called"); - io_service_->post([this]() { + io_service_->PostLambda([this]() { std::lock_guard<std::mutex> state_lock(engine_state_lock_); conn_.reset(); }); @@ -154,7 +154,7 @@ void RpcEngine::AsyncRpc( // In case user-side code isn't checking the status of Connect before doing RPC if(connect_canceled_) { - io_service_->post( + io_service_->PostLambda( [handler](){ handler(Status::Canceled()); } ); return; @@ -190,7 +190,7 @@ void RpcEngine::AsyncRpcCommsError( std::vector<std::shared_ptr<Request>> pendingRequests) { LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << std::to_string(pendingRequests.size())); - io_service().post([this, status, failedConnection, pendingRequests]() { + io_service_->PostLambda([this, status, failedConnection, pendingRequests]() { RpcCommsError(status, failedConnection, pendingRequests); }); } @@ -238,7 +238,7 @@ void RpcEngine::RpcCommsError( // on. There might be a good argument for caching the first error // rather than the last one, that gets messy - io_service().post([req, status]() { + io_service()->PostLambda([req, status]() { req->OnResponseArrived(nullptr, status); // Never call back while holding a lock }); it = pendingRequests.erase(it); @@ -283,7 +283,7 @@ void RpcEngine::RpcCommsError( for(unsigned int i=0; i<pendingRequests.size(); i++) { std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i]; - io_service().post([sharedCurrentRequest, badEndpointStatus]() { + io_service()->PostLambda([sharedCurrentRequest, badEndpointStatus]() { sharedCurrentRequest->OnResponseArrived(nullptr, badEndpointStatus); // Never call back while holding a lock }); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 9f45fcf..845eaf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -60,6 +60,7 @@ class RpcConnection; class SaslProtocol; class RpcConnection; class Request; +class IoService; /* * These methods of the RpcEngine will never acquire locks, and are safe for @@ -83,7 +84,7 @@ public: virtual const std::string &user_name() = 0; virtual const std::string &protocol_name() = 0; virtual int protocol_version() = 0; - virtual ::asio::io_service &io_service() = 0; + virtual std::shared_ptr<IoService> io_service() const = 0; virtual const Options &options() = 0; }; @@ -107,7 +108,7 @@ class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this< kCallIdSasl = -33 }; - RpcEngine(::asio::io_service *io_service, const Options &options, + RpcEngine(std::shared_ptr<IoService> service, const Options &options, const std::string &client_name, const std::string &user_name, const char *protocol_name, int protocol_version); @@ -145,7 +146,7 @@ class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this< const std::string &user_name() override { return auth_info_.getUser(); } const std::string &protocol_name() override { return protocol_name_; } int protocol_version() override { return protocol_version_; } - ::asio::io_service &io_service() override { return *io_service_; } + std::shared_ptr<IoService> io_service() const override { return io_service_; } const Options &options() override { return options_; } static std::string GetRandomClientName(); @@ -162,7 +163,7 @@ protected: std::vector<::asio::ip::tcp::endpoint> last_endpoints_; private: - ::asio::io_service * const io_service_; + mutable std::shared_ptr<IoService> io_service_; const Options options_; const std::string client_name_; const std::string client_id_; http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index 00bbf3d..23de015 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -16,11 +16,12 @@ * limitations under the License. */ -#include "fs/filesystem.h" -#include "fs/bad_datanode_tracker.h" #include "common/libhdfs_events_impl.h" - #include "common/util.h" +#include "fs/filesystem.h" +#include "fs/filehandle.h" +#include "fs/bad_datanode_tracker.h" +#include "reader/block_reader.h" #include <gmock/gmock.h> @@ -54,7 +55,7 @@ public: const std::string & client_name, const hadoop::hdfs::LocatedBlockProto &block, size_t offset, - const MutableBuffers &buffers, + const MutableBuffer &buffer, const std::function<void(const Status &, size_t)> handler)); virtual void CancelOperation() override { @@ -67,14 +68,14 @@ class MockDNConnection : public DataNodeConnection, public std::enable_shared_fr handler(Status::OK(), shared_from_this()); } - void async_read_some(const MutableBuffers &buf, + void async_read_some(const MutableBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { (void)buf; handler(asio::error::fault, 0); } - void async_write_some(const ConstBuffers &buf, + void async_write_some(const ConstBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { (void)buf; @@ -101,7 +102,7 @@ protected: return mock_reader_; } std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( - ::asio::io_service *io_service, + std::shared_ptr<IoService> io_service, const ::hadoop::hdfs::DatanodeInfoProto & dn, const hadoop::common::TokenProto * token) override { (void) io_service; (void) dn; (void) token; @@ -130,12 +131,12 @@ TEST(BadDataNodeTest, TestNoNodes) { char buf[4096] = { 0, }; - IoServiceImpl io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); auto bad_node_tracker = std::make_shared<BadDataNodeTracker>(); auto monitors = std::make_shared<LibhdfsEvents>(); bad_node_tracker->AddBadNode("foo"); - PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker, monitors); + PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, bad_node_tracker, monitors); Status stat; size_t read = 0; @@ -170,7 +171,7 @@ TEST(BadDataNodeTest, NNEventCallback) { char buf[4096] = { 0, }; - IoServiceImpl io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); auto tracker = std::make_shared<BadDataNodeTracker>(); @@ -191,7 +192,7 @@ TEST(BadDataNodeTest, NNEventCallback) { return event_response::make_ok(); }); - PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); + PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors); Status stat; size_t read = 0; @@ -234,10 +235,10 @@ TEST(BadDataNodeTest, RecoverableError) { char buf[4096] = { 0, }; - IoServiceImpl io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); auto tracker = std::make_shared<BadDataNodeTracker>(); auto monitors = std::make_shared<LibhdfsEvents>(); - PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); + PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors); Status stat; size_t read = 0; EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) @@ -285,10 +286,10 @@ TEST(BadDataNodeTest, InternalError) { char buf[4096] = { 0, }; - IoServiceImpl io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); auto tracker = std::make_shared<BadDataNodeTracker>(); auto monitors = std::make_shared<LibhdfsEvents>(); - PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); + PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors); Status stat; size_t read = 0; EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc index 5ee9789..2fdbd80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc @@ -16,13 +16,15 @@ * limitations under the License. */ -#include "common/hdfs_ioservice.h" +#include "hdfspp/ioservice.h" #include <future> #include <functional> #include <thread> #include <string> + +#include <google/protobuf/stubs/common.h> #include <gmock/gmock.h> using ::testing::_; @@ -34,7 +36,7 @@ using namespace hdfs; // Make sure IoService spins up specified number of threads TEST(IoServiceTest, InitThreads) { #ifndef DISABLE_CONCURRENT_WORKERS - std::shared_ptr<IoServiceImpl> service = std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared()); + std::shared_ptr<IoService> service = IoService::MakeShared(); EXPECT_NE(service, nullptr); unsigned int thread_count = 4; @@ -50,7 +52,7 @@ TEST(IoServiceTest, InitThreads) { // Make sure IoService defaults to logical thread count TEST(IoServiceTest, InitDefaultThreads) { #ifndef DISABLE_CONCURRENT_WORKERS - std::shared_ptr<IoServiceImpl> service = std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared()); + std::shared_ptr<IoService> service = IoService::MakeShared(); EXPECT_NE(service, nullptr); unsigned int thread_count = std::thread::hardware_concurrency(); @@ -66,7 +68,7 @@ TEST(IoServiceTest, InitDefaultThreads) { // Check IoService::PostTask TEST(IoServiceTest, SimplePost) { - std::shared_ptr<IoServiceImpl> service = std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared()); + std::shared_ptr<IoService> service = IoService::MakeShared(); EXPECT_NE(service, nullptr); unsigned int thread_count = std::thread::hardware_concurrency(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h index cd1fc12..de234ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h @@ -49,7 +49,7 @@ public: virtual ~MockConnectionBase(); typedef std::pair<asio::error_code, std::string> ProducerResult; - void async_read_some(const MutableBuffers &buf, + void async_read_some(const MutableBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { if (produced_.size() == 0) { @@ -72,7 +72,7 @@ public: io_service_->post(std::bind(handler, asio::error_code(), len)); } - void async_write_some(const ConstBuffers &buf, + void async_write_some(const ConstBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { // CompletionResult res = OnWrite(buf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc index 80127f3..4b909b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -69,14 +69,14 @@ public: /* event handler to trigger side effects */ std::function<void(void)> OnRead; - void async_read_some(const MutableBuffers &buf, + void async_read_some(const MutableBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { this->OnRead(); this->MockConnectionBase::async_read_some(buf, handler); } - void async_write_some(const ConstBuffers &buf, + void async_write_some(const ConstBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { this->MockConnectionBase::async_write_some(buf, handler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index f998c7f..6bbe725 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -16,6 +16,8 @@ * limitations under the License. */ +#include "hdfspp/ioservice.h" + #include "mock_connection.h" #include "test.pb.h" #include "RpcHeader.pb.h" @@ -23,7 +25,6 @@ #include "common/namenode_info.h" #include <google/protobuf/io/coded_stream.h> - #include <gmock/gmock.h> using ::hadoop::common::RpcResponseHeaderProto; @@ -104,9 +105,10 @@ static inline std::pair<error_code, string> RpcResponse( using namespace hdfs; TEST(RpcEngineTest, TestRoundTrip) { - ::asio::io_service io_service; + + std::shared_ptr<IoService> io_service = IoService::MakeShared(); Options options; - std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); + std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1); auto conn = std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); conn->TEST_set_connected(true); @@ -129,20 +131,20 @@ TEST(RpcEngineTest, TestRoundTrip) { EchoRequestProto req; req.set_message("foo"); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); - engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) { + engine->AsyncRpc("test", &req, resp, [resp, &complete,io_service](const Status &stat) { ASSERT_TRUE(stat.ok()); ASSERT_EQ("foo", resp->message()); complete = true; - io_service.stop(); + io_service->Stop(); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } TEST(RpcEngineTest, TestConnectionResetAndFail) { - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); Options options; - std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); + std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1); auto conn = std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); conn->TEST_set_connected(true); @@ -164,23 +166,23 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) { req.set_message("foo"); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); - engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_FALSE(stat.ok()); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } TEST(RpcEngineTest, TestConnectionResetAndRecover) { - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); Options options; options.max_rpc_retries = 1; options.rpc_retry_delay_ms = 0; std::shared_ptr<SharedConnectionEngine> engine - = std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + = std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); // Normally determined during RpcEngine::Connect, but in this case options // provides enough info to determine policy here. @@ -206,22 +208,22 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) { req.set_message("foo"); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); - engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_TRUE(stat.ok()); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) { - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); Options options; options.max_rpc_retries = 1; options.rpc_retry_delay_ms = 1; std::shared_ptr<SharedConnectionEngine> engine = - std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); // Normally determined during RpcEngine::Connect, but in this case options // provides enough info to determine policy here. @@ -246,17 +248,17 @@ TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) { req.set_message("foo"); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); - engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_TRUE(stat.ok()); }); - ::asio::deadline_timer timer(io_service); + ::asio::deadline_timer timer(io_service->GetRaw()); timer.expires_from_now(std::chrono::hours(100)); timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } @@ -267,7 +269,7 @@ TEST(RpcEngineTest, TestConnectionFailure) SharedMockConnection::SetSharedConnectionData(producer); // Error and no retry - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); bool complete = false; @@ -275,16 +277,16 @@ TEST(RpcEngineTest, TestConnectionFailure) options.max_rpc_retries = 0; options.rpc_retry_delay_ms = 0; std::shared_ptr<SharedConnectionEngine> engine - = std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + = std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); - engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { + engine->Connect("", make_endpoint(), [&complete, io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_FALSE(stat.ok()); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } @@ -294,7 +296,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure) producer->checkProducerForConnect = true; SharedMockConnection::SetSharedConnectionData(producer); - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); bool complete = false; @@ -302,18 +304,18 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure) options.max_rpc_retries = 2; options.rpc_retry_delay_ms = 0; std::shared_ptr<SharedConnectionEngine> engine = - std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); - engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { + engine->Connect("", make_endpoint(), [&complete, io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_FALSE(stat.ok()); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } @@ -323,7 +325,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover) producer->checkProducerForConnect = true; SharedMockConnection::SetSharedConnectionData(producer); - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); bool complete = false; @@ -331,29 +333,30 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover) options.max_rpc_retries = 1; options.rpc_retry_delay_ms = 0; std::shared_ptr<SharedConnectionEngine> engine = - std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); - engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { + engine->Connect("", make_endpoint(), [&complete, io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_TRUE(stat.ok()); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } TEST(RpcEngineTest, TestEventCallbacks) { - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); + Options options; options.max_rpc_retries = 99; options.rpc_retry_delay_ms = 0; std::shared_ptr<SharedConnectionEngine> engine = - std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); // Normally determined during RpcEngine::Connect, but in this case options // provides enough info to determine policy here. @@ -399,17 +402,18 @@ TEST(RpcEngineTest, TestEventCallbacks) std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); bool complete = false; - engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { + engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_TRUE(stat.ok()); }); // If you're adding event hooks you'll most likely need to update this. // It's a brittle test but makes it hard to miss control flow changes in RPC retry. - for(const auto& m : callbacks) + for(const auto& m : callbacks) { std::cerr << m << std::endl; - io_service.run(); + } + io_service->Run(); ASSERT_TRUE(complete); ASSERT_EQ(9, callbacks.size()); ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error @@ -430,7 +434,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover) producer->checkProducerForConnect = true; SharedMockConnection::SetSharedConnectionData(producer); - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); bool complete = false; @@ -438,31 +442,31 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover) options.max_rpc_retries = 1; options.rpc_retry_delay_ms = 1; std::shared_ptr<SharedConnectionEngine> engine = - std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); + std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); - engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { + engine->Connect("", make_endpoint(), [&complete, io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_TRUE(stat.ok()); }); - ::asio::deadline_timer timer(io_service); + ::asio::deadline_timer timer(io_service->GetRaw()); timer.expires_from_now(std::chrono::hours(100)); timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } TEST(RpcEngineTest, TestTimeout) { - ::asio::io_service io_service; + std::shared_ptr<IoService> io_service = IoService::MakeShared(); Options options; options.rpc_timeout = 1; - std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); + std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1); auto conn = std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); conn->TEST_set_connected(true); @@ -481,15 +485,15 @@ TEST(RpcEngineTest, TestTimeout) { std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) { complete = true; - io_service.stop(); + io_service->Stop(); ASSERT_FALSE(stat.ok()); }); - ::asio::deadline_timer timer(io_service); + ::asio::deadline_timer timer(io_service->GetRaw()); timer.expires_from_now(std::chrono::hours(100)); timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); - io_service.run(); + io_service->Run(); ASSERT_TRUE(complete); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org