This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new c230dd0ec feat(replication): clean up all wait_contexts in
CleanupWaitConnection (#3104)
c230dd0ec is described below
commit c230dd0ec15e0ac62d0b67061c642bf84c5d9637
Author: Zhixin Wen <[email protected]>
AuthorDate: Sat Aug 9 01:43:13 2025 -0700
feat(replication): clean up all wait_contexts in CleanupWaitConnection
(#3104)
---
src/server/server.cc | 23 ++++++++++++++++++-----
1 file changed, 18 insertions(+), 5 deletions(-)
diff --git a/src/server/server.cc b/src/server/server.cc
index 90d793391..edd3b0a20 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -737,11 +737,24 @@ void
Server::WakeupWaitConnections(rocksdb::SequenceNumber seq) {
void Server::CleanupWaitConnection(redis::Connection *conn) {
std::unique_lock<std::shared_mutex> guard(wait_contexts_mu_);
- auto it = std::find_if(wait_contexts_.begin(), wait_contexts_.end(),
- [conn](const auto &pair) { return pair.second.conn ==
conn; });
- if (it != wait_contexts_.end()) {
- wait_contexts_.erase(it);
- DecrBlockedClientNum();
+ // Remove all wait contexts that match the given connection
+ auto it = wait_contexts_.begin();
+ int erased_count = 0;
+ while (it != wait_contexts_.end()) {
+ if (it->second.conn == conn) {
+ it = wait_contexts_.erase(it);
+ erased_count++;
+ // Technically only one client is unblocked, but we call
IncrBlockedClientNum for each added wait context,
+ // so we need to call DecrBlockedClientNum for each erased wait context.
+ // Multiple wait contexts on the same connection should not happen, but
we should be defensive.
+ DecrBlockedClientNum();
+ } else {
+ ++it;
+ }
+ }
+
+ if (erased_count > 0) {
+ warn("[server] {} wait contexts found for connection with fd {}, expect
1", erased_count, conn->GetFD());
}
}