This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 921078fba feat(function): propagate FUNCTION FLUSH/DELETE to replicas 
(#3121)
921078fba is described below

commit 921078fba1e1720b77a9ba04ba9dae1aeb5a4ba0
Author: Twice <[email protected]>
AuthorDate: Sun Aug 17 23:51:05 2025 +0800

    feat(function): propagate FUNCTION FLUSH/DELETE to replicas (#3121)
---
 src/commands/cmd_function.cc |  4 ++++
 src/server/server.cc         | 28 +++++++++++++++-------------
 src/server/server.h          |  1 -
 3 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/src/commands/cmd_function.cc b/src/commands/cmd_function.cc
index 7d2934d86..0161dd691 100644
--- a/src/commands/cmd_function.cc
+++ b/src/commands/cmd_function.cc
@@ -74,12 +74,16 @@ struct CommandFunction : Commander {
       }
       auto s = lua::FunctionDelete(ctx, conn, libname);
       if (!s) return s;
+      s = srv->Propagate(engine::kPropagateScriptCommand, args_);
+      if (!s) return s;
 
       *output = RESP_OK;
       return Status::OK();
     } else if (parser.EatEqICase("flush")) {
       auto s = lua::FunctionFlush(conn, &ctx);
       if (!s) return s;
+      s = srv->Propagate(engine::kPropagateScriptCommand, args_);
+      if (!s) return s;
 
       *output = RESP_OK;
       return Status::OK();
diff --git a/src/server/server.cc b/src/server/server.cc
index edd3b0a20..4a707fd58 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -38,6 +38,7 @@
 #include <shared_mutex>
 #include <utility>
 
+#include "commands/command_parser.h"
 #include "commands/commander.h"
 #include "common/string_util.h"
 #include "config/config.h"
@@ -1885,20 +1886,21 @@ Status Server::Propagate(const std::string &channel, 
const std::vector<std::stri
   return storage->WriteToPropagateCF(ctx, channel, value);
 }
 
-Status Server::ExecPropagateScriptCommand(const std::vector<std::string> 
&tokens) {
-  auto subcommand = util::ToLower(tokens[1]);
-  if (subcommand == "flush") {
-    ScriptReset();
-  }
-  return Status::OK();
-}
-
 Status Server::ExecPropagatedCommand(const std::vector<std::string> &tokens) {
-  if (tokens.empty()) return Status::OK();
-
-  auto command = util::ToLower(tokens[0]);
-  if (command == "script" && tokens.size() >= 2) {
-    return ExecPropagateScriptCommand(tokens);
+  CommandParser parser(tokens);
+  if (parser.EatEqICase("script")) {
+    if (parser.EatEqICase("flush")) {
+      // here we must acquire the global lock to guarantee that
+      // no EVAL or FCALL is executing while resetting lua state.
+      auto guard = WorkExclusivityGuard();
+      ScriptReset();
+    }
+  } else if (parser.EatEqICase("function")) {
+    if (parser.EatEqICase("delete") || parser.EatEqICase("flush")) {
+      // same as above to acquire the global lock
+      auto guard = WorkExclusivityGuard();
+      ScriptReset();
+    }
   }
 
   return Status::OK();
diff --git a/src/server/server.h b/src/server/server.h
index ecaa4c94b..db74f481a 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -318,7 +318,6 @@ class Server {
 
   Status Propagate(const std::string &channel, const std::vector<std::string> 
&tokens) const;
   Status ExecPropagatedCommand(const std::vector<std::string> &tokens);
-  Status ExecPropagateScriptCommand(const std::vector<std::string> &tokens);
 
   LogCollector<PerfEntry> *GetPerfLog() { return &perf_log_; }
   LogCollector<SlowEntry> *GetSlowLog() { return &slow_log_; }

Reply via email to