This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new adb886d3e fix(asio): fix the crash caused by early invalidated socket
(#2111)
adb886d3e is described below
commit adb886d3e82676566afa6faabde3789b20ed120f
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Sep 11 11:54:27 2024 +0800
fix(asio): fix the crash caused by early invalidated socket (#2111)
Fix https://github.com/apache/incubator-pegasus/issues/307.
This patch fixes the crash bug by lazily closing the socket after all socket
references released to ensure multi-threads safety.
This patch also includes some tidy-clang fixes.
---
.clang-tidy | 2 +-
build_tools/clang_tidy.py | 2 +-
src/runtime/rpc/asio_rpc_session.cpp | 21 +++++++++++++++------
src/runtime/rpc/asio_rpc_session.h | 9 +++++----
src/runtime/rpc/network.cpp | 8 +++++++-
src/runtime/rpc/network.h | 6 ++----
src/runtime/rpc/network.sim.h | 20 ++++++++++----------
7 files changed, 41 insertions(+), 27 deletions(-)
diff --git a/.clang-tidy b/.clang-tidy
index 1cdfca281..20ca366c0 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -18,7 +18,7 @@
#
https://releases.llvm.org/14.0.0/tools/clang/tools/extra/docs/clang-tidy/index.html
CheckOptions: []
-Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-va
[...]
+Checks:
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-va
[...]
ExtraArgs:
ExtraArgsBefore: []
FormatStyle: none
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index ed4f4d52a..2c3645d08 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -60,7 +60,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
"clang-tidy",
"-p0",
"-path", BUILD_PATH,
-
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-p
[...]
+
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-p
[...]
"-extra-arg=-language=c++",
"-extra-arg=-std=c++17",
"-extra-arg=-Ithirdparty/output/include"]
diff --git a/src/runtime/rpc/asio_rpc_session.cpp
b/src/runtime/rpc/asio_rpc_session.cpp
index 39ac3e6c0..331d0253e 100644
--- a/src/runtime/rpc/asio_rpc_session.cpp
+++ b/src/runtime/rpc/asio_rpc_session.cpp
@@ -130,7 +130,7 @@ void asio_rpc_session::do_read(int read_next)
} else {
LOG_ERROR("asio read from {} failed: {}", _remote_addr,
ec.message());
}
- on_failure();
+ on_failure(false);
} else {
_reader.mark_read(length);
@@ -151,7 +151,7 @@ void asio_rpc_session::do_read(int read_next)
if (read_next == -1) {
LOG_ERROR("asio read from {} failed", _remote_addr);
- on_failure();
+ on_failure(false);
} else {
start_read_next(read_next);
}
@@ -197,16 +197,25 @@ asio_rpc_session::asio_rpc_session(asio_network_provider
&net,
set_options();
}
-void asio_rpc_session::close()
+asio_rpc_session::~asio_rpc_session()
{
+ // Because every async_* invoking adds the reference counter and releases
the reference counter
+ // in corresponding callback, it's certain that the reference counter is
zero in its
+ // destructor, which means there is no inflight invoking, then it's safe
to close the socket.
+ asio_rpc_session::close();
+}
+void asio_rpc_session::close()
+{
boost::system::error_code ec;
_socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both,
ec);
- if (ec)
+ if (ec) {
LOG_WARNING("asio socket shutdown failed, error = {}", ec.message());
+ }
_socket->close(ec);
- if (ec)
+ if (ec) {
LOG_WARNING("asio socket close failed, error = {}", ec.message());
+ }
}
void asio_rpc_session::connect()
@@ -222,7 +231,7 @@ void asio_rpc_session::connect()
set_options();
set_connected();
- on_send_completed();
+ on_send_completed(0);
start_read_next();
} else {
LOG_ERROR(
diff --git a/src/runtime/rpc/asio_rpc_session.h
b/src/runtime/rpc/asio_rpc_session.h
index e3f5da4e2..7a7cda8b3 100644
--- a/src/runtime/rpc/asio_rpc_session.h
+++ b/src/runtime/rpc/asio_rpc_session.h
@@ -51,10 +51,14 @@ public:
message_parser_ptr &parser,
bool is_client);
- ~asio_rpc_session() override = default;
+ ~asio_rpc_session() override;
void send(uint64_t signature) override;
+ // The under layer socket will be invalidated after being closed.
+ //
+ // It's needed to prevent the '_socket' to be closed while the socket's
async_* interfaces are
+ // in flight.
void close() override;
void connect() override;
@@ -69,9 +73,6 @@ private:
}
}
-private:
- // boost::asio::socket is thread-unsafe, must use lock to prevent a
- // reading/writing socket being modified or closed concurrently.
std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
};
diff --git a/src/runtime/rpc/network.cpp b/src/runtime/rpc/network.cpp
index c572b19a8..28b6a731c 100644
--- a/src/runtime/rpc/network.cpp
+++ b/src/runtime/rpc/network.cpp
@@ -429,8 +429,14 @@ bool rpc_session::on_disconnected(bool is_write)
void rpc_session::on_failure(bool is_write)
{
+ // Just update the state machine here.
if (on_disconnected(is_write)) {
- close();
+ // The under layer socket may be used by async_* interfaces
concurrently, it's not thread
+ // safe to invalidate the '_socket', it should be invalidated when the
session is
+ // destroyed.
+ LOG_WARNING("disconnect to remote {}, the socket will be lazily closed
when the session "
+ "destroyed",
+ _remote_addr);
}
}
diff --git a/src/runtime/rpc/network.h b/src/runtime/rpc/network.h
index 5a9bb0609..aca926743 100644
--- a/src/runtime/rpc/network.h
+++ b/src/runtime/rpc/network.h
@@ -274,8 +274,8 @@ public:
// should always be called in lock
bool unlink_message_for_send();
virtual void send(uint64_t signature) = 0;
- void on_send_completed(uint64_t signature = 0);
- virtual void on_failure(bool is_write = false);
+ void on_send_completed(uint64_t signature);
+ virtual void on_failure(bool is_write);
protected:
///
@@ -314,7 +314,6 @@ protected:
uint64_t _message_sent;
// ]
-protected:
///
/// change status and check status
///
@@ -327,7 +326,6 @@ protected:
void clear_send_queue(bool resend_msgs);
bool on_disconnected(bool is_write);
-protected:
// constant info
connection_oriented_network &_net;
dsn::rpc_address _remote_addr;
diff --git a/src/runtime/rpc/network.sim.h b/src/runtime/rpc/network.sim.h
index f7954afbf..1e2633344 100644
--- a/src/runtime/rpc/network.sim.h
+++ b/src/runtime/rpc/network.sim.h
@@ -50,15 +50,15 @@ public:
::dsn::rpc_address remote_addr,
message_parser_ptr &parser);
- virtual void connect();
+ void connect() override;
- virtual void send(uint64_t signature) override;
+ void send(uint64_t signature) override;
- virtual void do_read(int sz) override {}
+ void do_read(int sz) override {}
- virtual void close() override {}
+ void close() override {}
- virtual void on_failure(bool is_write = false) override {}
+ void on_failure(bool is_write) override {}
};
class sim_server_session : public rpc_session
@@ -69,15 +69,15 @@ public:
rpc_session_ptr &client,
message_parser_ptr &parser);
- virtual void send(uint64_t signature) override;
+ void send(uint64_t signature) override;
- virtual void connect() {}
+ void connect() override {}
- virtual void do_read(int sz) override {}
+ void do_read(int sz) override {}
- virtual void close() override {}
+ void close() override {}
- virtual void on_failure(bool is_write = false) override {}
+ void on_failure(bool is_write) override {}
private:
rpc_session_ptr _client;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]