This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 921078fba feat(function): propagate FUNCTION FLUSH/DELETE to replicas
(#3121)
921078fba is described below
commit 921078fba1e1720b77a9ba04ba9dae1aeb5a4ba0
Author: Twice <[email protected]>
AuthorDate: Sun Aug 17 23:51:05 2025 +0800
feat(function): propagate FUNCTION FLUSH/DELETE to replicas (#3121)
---
src/commands/cmd_function.cc | 4 ++++
src/server/server.cc | 28 +++++++++++++++-------------
src/server/server.h | 1 -
3 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/src/commands/cmd_function.cc b/src/commands/cmd_function.cc
index 7d2934d86..0161dd691 100644
--- a/src/commands/cmd_function.cc
+++ b/src/commands/cmd_function.cc
@@ -74,12 +74,16 @@ struct CommandFunction : Commander {
}
auto s = lua::FunctionDelete(ctx, conn, libname);
if (!s) return s;
+ s = srv->Propagate(engine::kPropagateScriptCommand, args_);
+ if (!s) return s;
*output = RESP_OK;
return Status::OK();
} else if (parser.EatEqICase("flush")) {
auto s = lua::FunctionFlush(conn, &ctx);
if (!s) return s;
+ s = srv->Propagate(engine::kPropagateScriptCommand, args_);
+ if (!s) return s;
*output = RESP_OK;
return Status::OK();
diff --git a/src/server/server.cc b/src/server/server.cc
index edd3b0a20..4a707fd58 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -38,6 +38,7 @@
#include <shared_mutex>
#include <utility>
+#include "commands/command_parser.h"
#include "commands/commander.h"
#include "common/string_util.h"
#include "config/config.h"
@@ -1885,20 +1886,21 @@ Status Server::Propagate(const std::string &channel,
const std::vector<std::stri
return storage->WriteToPropagateCF(ctx, channel, value);
}
-Status Server::ExecPropagateScriptCommand(const std::vector<std::string>
&tokens) {
- auto subcommand = util::ToLower(tokens[1]);
- if (subcommand == "flush") {
- ScriptReset();
- }
- return Status::OK();
-}
-
Status Server::ExecPropagatedCommand(const std::vector<std::string> &tokens) {
- if (tokens.empty()) return Status::OK();
-
- auto command = util::ToLower(tokens[0]);
- if (command == "script" && tokens.size() >= 2) {
- return ExecPropagateScriptCommand(tokens);
+ CommandParser parser(tokens);
+ if (parser.EatEqICase("script")) {
+ if (parser.EatEqICase("flush")) {
+ // here we must acquire the global lock to guarantee that
+ // no EVAL or FCALL is executing while resetting lua state.
+ auto guard = WorkExclusivityGuard();
+ ScriptReset();
+ }
+ } else if (parser.EatEqICase("function")) {
+ if (parser.EatEqICase("delete") || parser.EatEqICase("flush")) {
+ // same as above to acquire the global lock
+ auto guard = WorkExclusivityGuard();
+ ScriptReset();
+ }
}
return Status::OK();
diff --git a/src/server/server.h b/src/server/server.h
index ecaa4c94b..db74f481a 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -318,7 +318,6 @@ class Server {
Status Propagate(const std::string &channel, const std::vector<std::string>
&tokens) const;
Status ExecPropagatedCommand(const std::vector<std::string> &tokens);
- Status ExecPropagateScriptCommand(const std::vector<std::string> &tokens);
LogCollector<PerfEntry> *GetPerfLog() { return &perf_log_; }
LogCollector<SlowEntry> *GetSlowLog() { return &slow_log_; }