This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5d71a8178 feat(cmds): add support for CLIENT REPLY subcommand (#2943)
5d71a8178 is described below
commit 5d71a81781e30558994c188b5aa1c0857f59ce7c
Author: DCjanus <[email protected]>
AuthorDate: Thu May 8 21:57:30 2025 +0800
feat(cmds): add support for CLIENT REPLY subcommand (#2943)
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_server.cc | 30 +++++++++++++++++--
src/server/redis_connection.cc | 7 +++++
src/server/redis_connection.h | 12 ++++++++
.../unit/introspection/introspection_test.go | 34 ++++++++++++++++++++++
tests/gocase/util/tcp_client.go | 10 +++++++
5 files changed, 90 insertions(+), 3 deletions(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index d1359087c..661df2cf5 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -392,7 +392,7 @@ class CommandClient : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
subcommand_ = util::ToLower(args[1]);
- // subcommand: getname id kill list info setname
+ // subcommand: getname id kill list info setname reply
if ((subcommand_ == "id" || subcommand_ == "getname" || subcommand_ ==
"list" || subcommand_ == "info") &&
args.size() == 2) {
return Status::OK();
@@ -412,6 +412,23 @@ class CommandClient : public Commander {
return Status::OK();
}
+ if (subcommand_ == "reply") {
+ if (args.size() != 3) {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+ auto mode_str = util::ToLower(args[2]);
+ if (mode_str == "on") {
+ reply_mode_ = redis::Connection::ReplyMode::ON;
+ } else if (mode_str == "off") {
+ reply_mode_ = redis::Connection::ReplyMode::OFF;
+ } else if (mode_str == "skip") {
+ reply_mode_ = redis::Connection::ReplyMode::SKIP;
+ } else {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+ return Status::OK();
+ }
+
if ((subcommand_ == "kill")) {
if (args.size() == 2) {
return {Status::RedisParseErr, errInvalidSyntax};
@@ -464,7 +481,7 @@ class CommandClient : public Commander {
}
return Status::OK();
}
- return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL
ip:port|GETNAME|SETNAME"};
+ return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL
ip:port|GETNAME|SETNAME|REPLY"};
}
Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv,
Connection *conn, std::string *output) override {
@@ -497,15 +514,22 @@ class CommandClient : public Commander {
*output = redis::RESP_OK;
}
return Status::OK();
+ } else if (subcommand_ == "reply") {
+ conn->SetReplyMode(reply_mode_);
+ if (reply_mode_ != redis::Connection::ReplyMode::SKIP) {
+ *output = redis::RESP_OK;
+ }
+ return Status::OK();
}
- return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL
ip:port|GETNAME|SETNAME"};
+ return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL
ip:port|GETNAME|SETNAME|REPLY"};
}
private:
std::string addr_;
std::string conn_name_;
std::string subcommand_;
+ redis::Connection::ReplyMode reply_mode_ = redis::Connection::ReplyMode::ON;
bool skipme_ = false;
int64_t kill_type_ = 0;
uint64_t id_ = 0;
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 397810873..443f29a07 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -132,6 +132,13 @@ void Connection::OnEvent(bufferevent *bev, int16_t events)
{
}
void Connection::Reply(const std::string &msg) {
+ if (reply_mode_ == ReplyMode::SKIP) {
+ reply_mode_ = ReplyMode::ON;
+ return;
+ }
+ if (reply_mode_ == ReplyMode::OFF) {
+ return;
+ }
owner_->srv->stats.IncrOutboundBytes(msg.size());
redis::Reply(bufferevent_get_output(bev_), msg);
}
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index e8b44d941..d31fbce1c 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -50,6 +50,12 @@ class Connection : public EvbufCallbackBase<Connection> {
kAsking = 1 << 10,
};
+ enum class ReplyMode {
+ ON, // Always reply to every command (default)
+ OFF, // Never reply to any command
+ SKIP // Skip reply for the next command, then automatically switch back
to ON
+ };
+
explicit Connection(bufferevent *bev, Worker *owner);
~Connection();
@@ -181,6 +187,10 @@ class Connection : public EvbufCallbackBase<Connection> {
std::set<std::string> watched_keys;
std::atomic<bool> watched_keys_modified = false;
+ // Reply mode getter/setter
+ void SetReplyMode(ReplyMode mode) { reply_mode_ = mode; }
+ ReplyMode GetReplyMode() const { return reply_mode_; }
+
private:
uint64_t id_ = 0;
std::atomic<int> flags_ = 0;
@@ -215,6 +225,8 @@ class Connection : public EvbufCallbackBase<Connection> {
bool importing_ = false;
RESP protocol_version_ = RESP::v2;
+
+ ReplyMode reply_mode_ = ReplyMode::ON;
};
} // namespace redis
diff --git a/tests/gocase/unit/introspection/introspection_test.go
b/tests/gocase/unit/introspection/introspection_test.go
index af15117df..1b17a98ca 100644
--- a/tests/gocase/unit/introspection/introspection_test.go
+++ b/tests/gocase/unit/introspection/introspection_test.go
@@ -265,6 +265,40 @@ func TestIntrospection(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "SET", "key", "value").Err())
require.EqualValues(t, 1, rdb.Do(ctx, "MOVE", "key", "0").Val())
})
+
+ // Test CLIENT REPLY subcommand behaviors
+ t.Run("CLIENT REPLY mode switching", func(t *testing.T) {
+ c := srv.NewTCPClient()
+ defer func() { require.NoError(t, c.Close()) }()
+
+ // Should reply by default
+ require.NoError(t, c.WriteArgs("ECHO", "default"))
+ c.MustReadBulkString(t, "default")
+
+ // Set to OFF, following commands should not reply
+ require.NoError(t, c.WriteArgs("CLIENT", "REPLY", "OFF"))
+ require.NoError(t, c.WriteArgs("ECHO", "off"))
+ // No reply expected here, do not read
+
+ // Set back to ON, commands should reply again
+ require.NoError(t, c.WriteArgs("CLIENT", "REPLY", "ON"))
+ c.MustRead(t, "+OK")
+ require.NoError(t, c.WriteArgs("ECHO", "on"))
+ c.MustReadBulkString(t, "on")
+
+ // Set to SKIP, next command should not reply, then reply
resumes
+ require.NoError(t, c.WriteArgs("CLIENT", "REPLY", "SKIP"))
+ // No reply expected here, do not read
+
+ require.NoError(t, c.WriteArgs("ECHO", "skip1"))
+ // No reply expected here, do not read
+
+ require.NoError(t, c.WriteArgs("ECHO", "skip2"))
+ c.MustReadBulkString(t, "skip2")
+
+ require.NoError(t, c.WriteArgs("ECHO", "skip3"))
+ c.MustReadBulkString(t, "skip3")
+ })
}
func TestMultiServerIntrospection(t *testing.T) {
diff --git a/tests/gocase/util/tcp_client.go b/tests/gocase/util/tcp_client.go
index 46ed3ac9a..3cd9de2b1 100644
--- a/tests/gocase/util/tcp_client.go
+++ b/tests/gocase/util/tcp_client.go
@@ -87,6 +87,16 @@ func (c *TCPClient) MustReadStrings(t testing.TB, s
[]string) {
}
}
+func (c *TCPClient) MustReadBulkString(t testing.TB, s string) {
+ r, err := c.ReadLine()
+ require.NoError(t, err)
+ require.Equal(t, "$"+strconv.Itoa(len(s)), r)
+
+ r, err = c.ReadLine()
+ require.NoError(t, err)
+ require.Equal(t, s, r)
+}
+
func (c *TCPClient) MustReadStringsWithKey(t testing.TB, key string, s
[]string) {
r, err := c.ReadLine()
require.NoError(t, err)