Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 c2e1e23d8 -> e1f73feef


HDFS-10231: libhdfs++: Fix race conditions in RPC layer.  Contributed by Bob 
Hansen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e1f73fee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e1f73fee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e1f73fee

Branch: refs/heads/HDFS-8707
Commit: e1f73feefb02544e4104e21b5390b965140775a8
Parents: c2e1e23
Author: Bob Hansen <b...@hp.com>
Authored: Mon Apr 4 08:53:52 2016 -0700
Committer: Bob Hansen <b...@hp.com>
Committed: Mon Apr 4 08:53:52 2016 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  | 84 +++++++++++++-------
 .../native/libhdfspp/lib/rpc/rpc_connection.h   | 52 +++++++++---
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 83 +++++++++----------
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  | 23 +++---
 .../native/libhdfspp/tests/rpc_engine_test.cc   | 12 +--
 5 files changed, 158 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
index bed3347..6c3b82e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -158,15 +158,17 @@ void Request::OnResponseArrived(pbio::CodedInputStream 
*is,
 
 RpcConnection::RpcConnection(LockFreeRpcEngine *engine)
     : engine_(engine),
-      connected_(false) {}
+      connected_(kNotYetConnected) {}
 
 ::asio::io_service &RpcConnection::io_service() {
   return engine_->io_service();
 }
 
 void RpcConnection::StartReading() {
-  io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this,
-                              ::asio::error_code(), 0));
+  auto shared_this = shared_from_this();
+  io_service().post([shared_this, this] () {
+    OnRecvCompleted(::asio::error_code(), 0);
+  });
 }
 
 void RpcConnection::AsyncFlushPendingRequests() {
@@ -174,6 +176,8 @@ void RpcConnection::AsyncFlushPendingRequests() {
   io_service().post([shared_this, this]() {
     std::lock_guard<std::mutex> state_lock(connection_state_lock_);
 
+    LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc called (connected=" << 
ToString(connected_) << ")");
+
     if (!request_over_the_wire_) {
       FlushPendingRequests();
     }
@@ -281,40 +285,53 @@ void RpcConnection::AsyncRpc(
 
   auto r = std::make_shared<Request>(engine_, method_name, req,
                                      std::move(wrapped_handler));
-  pending_requests_.push_back(r);
-  FlushPendingRequests();
-}
 
-void RpcConnection::AsyncRawRpc(const std::string &method_name,
-                                const std::string &req,
-                                std::shared_ptr<std::string> resp,
-                                RpcCallback &&handler) {
-  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  if (connected_ == kDisconnected) {
+    // Oops.  The connection failed _just_ before the engine got a chance
+    //    to send it.  Register it as a failure
+    Status status = Status::ResourceUnavailable("RpcConnection closed before 
send.");
+    auto r_vector = std::vector<std::shared_ptr<Request> > (1, r);
+    assert(r_vector[0].get() != nullptr);
 
-  std::shared_ptr<RpcConnection> shared_this = shared_from_this();
-  auto wrapped_handler = [shared_this, this, resp, handler](
-      pbio::CodedInputStream *is, const Status &status) {
-    if (status.ok()) {
-      uint32_t size = 0;
-      is->ReadVarint32(&size);
-      auto limit = is->PushLimit(size);
-      is->ReadString(resp.get(), limit);
-      is->PopLimit(limit);
+    engine_->AsyncRpcCommsError(status, shared_from_this(), r_vector);
+  } else {
+    pending_requests_.push_back(r);
+
+    if (connected_ == kConnected) { // Dont flush if we're waiting or 
handshaking
+      FlushPendingRequests();
     }
-    handler(status);
-  };
+  }
+}
 
-  auto r = std::make_shared<Request>(engine_, method_name, req,
-                                     std::move(wrapped_handler));
-  pending_requests_.push_back(r);
-  FlushPendingRequests();
+void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & 
requests) {
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc[] called; connected=" << 
ToString(connected_));
+
+  if (connected_ == kDisconnected) {
+    // Oops.  The connection failed _just_ before the engine got a chance
+    //    to send it.  Register it as a failure
+    Status status = Status::ResourceUnavailable("RpcConnection closed before 
send.");
+    engine_->AsyncRpcCommsError(status, shared_from_this(), requests);
+  } else {
+    pending_requests_.reserve(pending_requests_.size() + requests.size());
+    for (auto r: requests) {
+      pending_requests_.push_back(r);
+    }
+    if (connected_ == kConnected) { // Dont flush if we're waiting or 
handshaking
+      FlushPendingRequests();
+    }
+  }
 }
 
+
 void RpcConnection::PreEnqueueRequests(
     std::vector<std::shared_ptr<Request>> requests) {
   // Public method - acquire lock
   std::lock_guard<std::mutex> state_lock(connection_state_lock_);
-  assert(!connected_);
+
+  LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called");
+
+  assert(connected_ == kNotYetConnected);
 
   pending_requests_.insert(pending_requests_.end(), requests.begin(),
                            requests.end());
@@ -349,7 +366,7 @@ void RpcConnection::CommsError(const Status &status) {
                          std::make_move_iterator(pending_requests_.end()));
   pending_requests_.clear();
 
-  engine_->AsyncRpcCommsError(status, requestsToReturn);
+  engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn);
 }
 
 void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) {
@@ -379,4 +396,15 @@ std::shared_ptr<Request> 
RpcConnection::RemoveFromRunningQueue(int call_id) {
   requests_on_fly_.erase(it);
   return req;
 }
+
+std::string RpcConnection::ToString(ConnectedState connected) {
+  switch(connected) {
+    case kNotYetConnected: return "NotYetConnected";
+    case kConnecting: return "Connecting";
+    case kConnected: return "Connected";
+    case kDisconnected: return "Disconnected";
+    default: return "Invalid ConnectedState";
+  }
+}
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index cab14fa..4c33a41 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -34,6 +34,8 @@ template <class NextLayer>
 class RpcConnectionImpl : public RpcConnection {
 public:
   RpcConnectionImpl(RpcEngine *engine);
+  virtual ~RpcConnectionImpl() override;
+
   virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
                        RpcCallback &handler);
   virtual void ConnectAndFlush(
@@ -49,7 +51,7 @@ public:
 
   NextLayer &next_layer() { return next_layer_; }
 
-  void TEST_set_connected(bool new_value) { connected_ = new_value; }
+  void TEST_set_connected(bool connected) { connected_ = connected ? 
kConnected : kNotYetConnected; }
 
  private:
   const Options options_;
@@ -66,7 +68,19 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine 
*engine)
       options_(engine->options()),
       next_layer_(engine->io_service()) {
     LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
-  }
+}
+
+template <class NextLayer>
+RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() {
+  LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << 
(void*)this);
+
+  std::lock_guard<std::mutex> state_lock(connection_state_lock_);
+  if (pending_requests_.size() > 0)
+    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items 
in the pending queue");
+  if (requests_on_fly_.size() > 0)
+    LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items 
in the requests_on_fly queue");
+}
+
 
 template <class NextLayer>
 void RpcConnectionImpl<NextLayer>::Connect(
@@ -93,6 +107,16 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
     return;
   }
 
+  if (connected_ == kConnected) {
+    FlushPendingRequests();
+    return;
+  }
+  if (connected_ != kNotYetConnected) {
+    LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while 
connected=" << ToString(connected_));
+    return;
+  }
+  connected_ = kConnecting;
+
   // Take the first endpoint, but remember the alternatives for later
   additional_endpoints_ = server;
   ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
@@ -169,8 +193,8 @@ void RpcConnectionImpl<NextLayer>::Handshake(RpcCallback 
&handler) {
                       [handshake_packet, handler, shared_this, this](
                           const ::asio::error_code &ec, size_t) {
                         Status status = ToStatus(ec);
-                        if (status.ok()) {
-                          connected_ = true;
+                        if (status.ok() && connected_ == kConnecting) {
+                          connected_ = kConnected;
                         }
                         handler(status);
                       });
@@ -208,9 +232,13 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
     return;
   }
 
-  if (!connected_) {
+  if (connected_ == kDisconnected) {
+    LOG_WARN(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to 
flush a disconnected connection");
     return;
   }
+  if (connected_ != kConnected) {
+    LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to 
flush a " << ToString(connected_) << " connection");
+  }
 
   // Don't send if we don't need to
   if (request_over_the_wire_) {
@@ -218,19 +246,25 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() 
{
   }
 
   std::shared_ptr<Request> req = pending_requests_.front();
+  auto weak_req = std::weak_ptr<Request>(req);
   pending_requests_.erase(pending_requests_.begin());
 
   std::shared_ptr<RpcConnection> shared_this = shared_from_this();
+  auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
   std::shared_ptr<std::string> payload = std::make_shared<std::string>();
   req->GetPacket(payload.get());
   if (!payload->empty()) {
+    assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end());
     requests_on_fly_[req->call_id()] = req;
     request_over_the_wire_ = req;
 
     req->timer().expires_from_now(
         std::chrono::milliseconds(options_.rpc_timeout));
-    req->timer().async_wait([shared_this, this, req](const ::asio::error_code 
&ec) {
-        this->HandleRpcTimeout(req, ec);
+    req->timer().async_wait([weak_this, weak_req, this](const 
::asio::error_code &ec) {
+        auto timeout_this = weak_this.lock();
+        auto timeout_req = weak_req.lock();
+        if (timeout_this && timeout_req)
+          this->HandleRpcTimeout(timeout_req, ec);
     });
 
     asio::async_write(next_layer_, asio::buffer(*payload),
@@ -320,11 +354,11 @@ void RpcConnectionImpl<NextLayer>::Disconnect() {
   LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
 
   request_over_the_wire_.reset();
-  if (connected_) {
+  if (connected_ == kConnecting || connected_ == kConnected) {
     next_layer_.cancel();
     next_layer_.close();
   }
-  connected_ = false;
+  connected_ = kDisconnected;
 }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index 70b50cf..132bb69 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -61,7 +61,6 @@ void RpcEngine::Shutdown() {
   LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called");
   io_service_->post([this]() {
     std::lock_guard<std::mutex> state_lock(engine_state_lock_);
-    conn_->Disconnect();
     conn_.reset();
   });
 }
@@ -122,47 +121,33 @@ std::shared_ptr<RpcConnection> 
RpcEngine::InitializeConnection()
   return result;
 }
 
-
-Status RpcEngine::RawRpc(const std::string &method_name, const std::string 
&req,
-                         std::shared_ptr<std::string> resp) {
-  LOG_TRACE(kRPC, << "RpcEngine::RawRpc called");
-
-  std::shared_ptr<RpcConnection> conn;
-  {
-    std::lock_guard<std::mutex> state_lock(engine_state_lock_);
-    if (!conn_) {
-        conn_ = InitializeConnection();
-        conn_->ConnectAndFlush(last_endpoints_);
-      }
-    conn = conn_;
-  }
-
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(stat->get_future());
-  conn->AsyncRawRpc(method_name, req, resp,
-                     [stat](const Status &status) { stat->set_value(status); 
});
-  return future.get();
-}
-
 void RpcEngine::AsyncRpcCommsError(
     const Status &status,
+    std::shared_ptr<RpcConnection> failedConnection,
     std::vector<std::shared_ptr<Request>> pendingRequests) {
-  LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called");
+  LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; conn=" << 
failedConnection.get() << " reqs=" << pendingRequests.size());
 
-  io_service().post([this, status, pendingRequests]() {
-    RpcCommsError(status, pendingRequests);
+  io_service().post([this, status, failedConnection, pendingRequests]() {
+    RpcCommsError(status, failedConnection, pendingRequests);
   });
 }
 
 void RpcEngine::RpcCommsError(
     const Status &status,
+    std::shared_ptr<RpcConnection> failedConnection,
     std::vector<std::shared_ptr<Request>> pendingRequests) {
   (void)status;
 
-  LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called");
+  LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called; conn=" << 
failedConnection.get() << " reqs=" << pendingRequests.size());
 
   std::lock_guard<std::mutex> state_lock(engine_state_lock_);
 
+  // If the failed connection is the current one, shut it down
+  //    It will be reconnected when there is work to do
+  if (failedConnection == conn_) {
+        conn_.reset();
+  }
+
   auto head_action = optional<RetryAction>();
 
   // Filter out anything with too many retries already
@@ -192,25 +177,35 @@ void RpcEngine::RpcCommsError(
     }
   }
 
-  // Close the connection and retry and requests that might have been sent to
-  //    the NN
-  if (!pendingRequests.empty() &&
-          head_action && head_action->action != RetryAction::FAIL) {
-    conn_ = InitializeConnection();
-
-    conn_->PreEnqueueRequests(pendingRequests);
-    if (head_action->delayMillis > 0) {
-      retry_timer.expires_from_now(
-          std::chrono::milliseconds(options_.rpc_retry_delay_ms));
-      retry_timer.async_wait([this](asio::error_code ec) {
-        if (!ec) conn_->ConnectAndFlush(last_endpoints_);
-      });
+  // If we have reqests that need to be re-sent, ensure that we have a 
connection
+  //   and send the requests to it
+  bool haveRequests = !pendingRequests.empty() &&
+          head_action && head_action->action != RetryAction::FAIL;
+
+  if (haveRequests) {
+    bool needNewConnection = !conn_;
+    if (needNewConnection) {
+      conn_ = InitializeConnection();
+      conn_->PreEnqueueRequests(pendingRequests);
+
+      if (head_action->delayMillis > 0) {
+        auto weak_conn = std::weak_ptr<RpcConnection>(conn_);
+        retry_timer.expires_from_now(
+            std::chrono::milliseconds(options_.rpc_retry_delay_ms));
+        retry_timer.async_wait([this, weak_conn](asio::error_code ec) {
+          auto strong_conn = weak_conn.lock();
+          if ( (!ec) && (strong_conn) ) {
+            strong_conn->ConnectAndFlush(last_endpoints_);
+          }
+        });
+      } else {
+        conn_->ConnectAndFlush(last_endpoints_);
+      }
     } else {
-      conn_->ConnectAndFlush(last_endpoints_);
+      // We have an existing connection (which might be closed; we don't know
+      //    until we hold the connection local) and should just add the new 
requests
+      conn_->AsyncRpc(pendingRequests);
     }
-  } else {
-    // Connection will try again if someone calls AsyncRpc
-    conn_.reset();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index 7b66ac0..8ea6e8d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -125,8 +125,7 @@ class RpcConnection : public 
std::enable_shared_from_this<RpcConnection> {
                 std::shared_ptr<::google::protobuf::MessageLite> resp,
                 const RpcCallback &handler);
 
-  void AsyncRawRpc(const std::string &method_name, const std::string &request,
-                   std::shared_ptr<std::string> resp, RpcCallback &&handler);
+  void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
 
   // Enqueue requests before the connection is connected.  Will be flushed
   //   on connect
@@ -182,7 +181,15 @@ class RpcConnection : public 
std::enable_shared_from_this<RpcConnection> {
 
   // Connection can have deferred connection, especially when we're pausing
   //   during retry
-  bool connected_;
+  enum ConnectedState {
+      kNotYetConnected,
+      kConnecting,
+      kConnected,
+      kDisconnected
+  };
+  static std::string ToString(ConnectedState connected);
+
+  ConnectedState connected_;
   // The request being sent over the wire; will also be in requests_on_fly_
   std::shared_ptr<Request> request_over_the_wire_;
   // Requests to be sent over the wire
@@ -207,6 +214,7 @@ class LockFreeRpcEngine {
 public:
   /* Enqueues a CommsError without acquiring a lock*/
   virtual void AsyncRpcCommsError(const Status &status,
+                      std::shared_ptr<RpcConnection> failedConnection,
                       std::vector<std::shared_ptr<Request>> pendingRequests) = 
0;
 
 
@@ -254,19 +262,16 @@ class RpcEngine : public LockFreeRpcEngine {
   Status Rpc(const std::string &method_name,
              const ::google::protobuf::MessageLite *req,
              const std::shared_ptr<::google::protobuf::MessageLite> &resp);
-  /**
-   * Send raw bytes as RPC payload. This is intended to be used in JNI
-   * bindings only.
-   **/
-  Status RawRpc(const std::string &method_name, const std::string &req,
-                std::shared_ptr<std::string> resp);
+
   void Start();
   void Shutdown();
 
   /* Enqueues a CommsError without acquiring a lock*/
   void AsyncRpcCommsError(const Status &status,
+                     std::shared_ptr<RpcConnection> failedConnection,
                      std::vector<std::shared_ptr<Request>> pendingRequests) 
override;
   void RpcCommsError(const Status &status,
+                     std::shared_ptr<RpcConnection> failedConnection,
                      std::vector<std::shared_ptr<Request>> pendingRequests);
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f73fee/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index b7d5d0b..b5f4d9a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -106,8 +106,8 @@ TEST(RpcEngineTest, TestRoundTrip) {
   ::asio::io_service io_service;
   Options options;
   RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
-  RpcConnectionImpl<MockRPCConnection> *conn =
-      new RpcConnectionImpl<MockRPCConnection>(&engine);
+  auto conn =
+      std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
   conn->TEST_set_connected(true);
   conn->StartReading();
 
@@ -142,8 +142,8 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) {
   ::asio::io_service io_service;
   Options options;
   RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
-  RpcConnectionImpl<MockRPCConnection> *conn =
-      new RpcConnectionImpl<MockRPCConnection>(&engine);
+  auto conn =
+      std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
   conn->TEST_set_connected(true);
   conn->StartReading();
 
@@ -436,8 +436,8 @@ TEST(RpcEngineTest, TestTimeout) {
   Options options;
   options.rpc_timeout = 1;
   RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
-  RpcConnectionImpl<MockRPCConnection> *conn =
-      new RpcConnectionImpl<MockRPCConnection>(&engine);
+  auto conn =
+      std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
   conn->TEST_set_connected(true);
   conn->StartReading();
 

Reply via email to