This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 006daf0c3 fix(txn): wrong RESP reply for EXEC if error occurred in
commit (#2996)
006daf0c3 is described below
commit 006daf0c39bf473a10f9f61bbe1e9d6455255907
Author: Twice <[email protected]>
AuthorDate: Fri May 30 16:04:45 2025 +0800
fix(txn): wrong RESP reply for EXEC if error occurred in commit (#2996)
Signed-off-by: PragmaTwice <[email protected]>
---
src/commands/cmd_txn.cc | 15 ++++++++++++---
src/server/redis_connection.cc | 19 ++++++++++++-------
src/server/redis_connection.h | 3 +++
3 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/src/commands/cmd_txn.cc b/src/commands/cmd_txn.cc
index 5f922ddf5..16d25a58e 100644
--- a/src/commands/cmd_txn.cc
+++ b/src/commands/cmd_txn.cc
@@ -78,8 +78,6 @@ class CommandExec : public Commander {
}
auto storage = srv->storage;
- // Reply multi length first
- conn->Reply(redis::MultiLen(conn->GetMultiExecCommands()->size()));
// Execute multi-exec commands
conn->SetInExec();
auto s = storage->BeginTxn();
@@ -91,7 +89,18 @@ class CommandExec : public Commander {
// So, if conn->IsMultiError(), the transaction should still be
committed.
s = storage->CommitTxn();
}
- return s;
+
+ conn->ResetMultiExec();
+ reset_multiexec.Disable();
+
+ if (s) {
+ conn->Reply(Array(conn->GetQueuedReplies()));
+ } else {
+
conn->Reply(Array(std::vector<std::string>(conn->GetQueuedReplies().size(),
redis::Error(s))));
+ }
+
+ conn->ClearQueuedReplies();
+ return Status::OK();
}
};
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 47279f29e..f113e0041 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -139,10 +139,17 @@ void Connection::Reply(const std::string &msg) {
if (reply_mode_ == ReplyMode::OFF) {
return;
}
+
owner_->srv->stats.IncrOutboundBytes(msg.size());
- redis::Reply(bufferevent_get_output(bev_), msg);
+ if (in_exec_) {
+ queued_replies_.push_back(msg);
+ } else {
+ redis::Reply(bufferevent_get_output(bev_), msg);
+ }
}
+const std::vector<std::string> &Connection::GetQueuedReplies() const { return
queued_replies_; }
+
void Connection::SendFile(int fd) {
// NOTE: we don't need to close the fd, the libevent will do that
auto output = bufferevent_get_output(bev_);
@@ -392,6 +399,9 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec)
break;
+ auto multi_error_exit = MakeScopeExit([&] {
+ if (is_multi_exec) multi_error_ = true;
+ });
auto cmd_s = Server::LookupAndCreateCommand(cmd_tokens.front());
if (!cmd_s.IsOK()) {
@@ -403,7 +413,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
EnableFlag(kCloseAsync);
return;
}
- if (is_multi_exec) multi_error_ = true;
Reply(redis::Error(
{Status::NotOK,
fmt::format("unknown command `{}`, with args beginning with: {}",
cmd_name,
@@ -418,7 +427,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
int tokens = static_cast<int>(cmd_tokens.size());
if (!attributes->CheckArity(tokens)) {
- if (is_multi_exec) multi_error_ = true;
Reply(redis::Error({Status::NotOK, "wrong number of arguments"}));
continue;
}
@@ -452,21 +460,18 @@ void
Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
if (srv_->IsLoading() && !(cmd_flags & kCmdLoading)) {
Reply(redis::Error({Status::RedisLoading, errRestoringBackup}));
- if (is_multi_exec) multi_error_ = true;
continue;
}
current_cmd->SetArgs(cmd_tokens);
auto s = current_cmd->Parse();
if (!s.IsOK()) {
- if (is_multi_exec) multi_error_ = true;
Reply(redis::Error(s));
continue;
}
if (is_multi_exec && (cmd_flags & kCmdNoMulti)) {
Reply(redis::Error({Status::NotOK, fmt::format("{} inside MULTI is not
allowed", util::ToUpper(cmd_name))}));
- multi_error_ = true;
continue;
}
@@ -478,7 +483,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
if (config->cluster_enabled) {
s = srv_->cluster->CanExecByMySelf(attributes, cmd_tokens, this);
if (!s.IsOK()) {
- if (is_multi_exec) multi_error_ = true;
Reply(redis::Error(s));
continue;
}
@@ -489,6 +493,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
DisableFlag(kAsking);
}
+ multi_error_exit.Disable();
// We don't execute commands, but queue them, and then execute in EXEC
command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdBypassMulti)) {
multi_cmds_.emplace_back(std::move(cmd_tokens));
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index d31fbce1c..462292521 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -71,6 +71,8 @@ class Connection : public EvbufCallbackBase<Connection> {
std::string ToString();
void Reply(const std::string &msg);
+ const std::vector<std::string> &GetQueuedReplies() const;
+ void ClearQueuedReplies() { queued_replies_.clear(); }
RESP GetProtocolVersion() const { return protocol_version_; }
void SetProtocolVersion(RESP version) { protocol_version_ = version; }
std::string Bool(bool b) const { return redis::Bool(protocol_version_, b); }
@@ -227,6 +229,7 @@ class Connection : public EvbufCallbackBase<Connection> {
RESP protocol_version_ = RESP::v2;
ReplyMode reply_mode_ = ReplyMode::ON;
+ std::vector<std::string> queued_replies_;
};
} // namespace redis