This is an automated email from the ASF dual-hosted git repository.

nic443 pushed a commit to branch feat/consul-registry-refactor
in repository https://gitbox.apache.org/repos/asf/apisix.git

commit 197960929610e29d79aa39e9400481a5056e3eb0
Author: Nic <[email protected]>
AuthorDate: Thu Apr 16 12:06:30 2026 +0800

    fix: address review feedback on consul discovery refactoring
    
    - Clone default_service in create_registry to avoid mutating caller's table
    - Create default registry on all workers (not just privileged agent) so
      nodes()/control_api() work on non-privileged workers
    - Replace flush_all() with prefix-based flush to avoid clobbering other
      registries in the shared dict
    - Resolve default_reg inside control_api handler to avoid stale capture
    - Replace ngx.timer.every with self-rescheduling ngx.timer.at so stop_flag
      can actually halt future wakeups for non-keepalive servers
    - Remove unused ngx_timer_every import
    - Fix fetch_services_from_server docstring to match actual return values
---
 apisix/discovery/consul/client.lua |  7 +++--
 apisix/discovery/consul/init.lua   | 52 ++++++++++++++++++++++++++------------
 2 files changed, 41 insertions(+), 18 deletions(-)

diff --git a/apisix/discovery/consul/client.lua 
b/apisix/discovery/consul/client.lua
index bce968377..cfb46d343 100644
--- a/apisix/discovery/consul/client.lua
+++ b/apisix/discovery/consul/client.lua
@@ -274,8 +274,11 @@ end
 -- ─── service fetching ─────────────────────────────────────────────────
 
 --- Fetch all services from a single consul server.
---- Returns: up_services (table of service_name -> nodes), catalog_index, 
health_index
----          On failure: nil, err_string
+--- Returns: up_services, err, catalog_index, health_index
+---   up_services: table of key -> nodes (nil on failure)
+---   err: error string (nil on success)
+---   catalog_index: latest catalog index from consul
+---   health_index: latest health index from consul
 ---
 --- options:
 ---   default_weight     (number)    default node weight
diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua
index 640eff99b..a9b791bb9 100644
--- a/apisix/discovery/consul/init.lua
+++ b/apisix/discovery/consul/init.lua
@@ -26,7 +26,6 @@ local error              = error
 local ngx                = ngx
 local tonumber           = tonumber
 local ngx_timer_at       = ngx.timer.at
-local ngx_timer_every    = ngx.timer.every
 local log                = core.log
 local json_delay_encode  = core.json.delay_encode
 local process            = require("ngx.process")
@@ -230,12 +229,25 @@ local function check_keepalive(reg, consul_server, 
retry_delay)
         return
     end
 
-    if consul_server.keepalive and not exiting() then
+    if exiting() then
+        return
+    end
+
+    if consul_server.keepalive then
         local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server, 
retry_delay)
         if not ok then
             log.error("create ngx_timer_at got error: ", err)
             return
         end
+    else
+        -- self-rescheduling poll: use timer.at instead of timer.every
+        -- so stop_flag can actually halt future wakeups
+        local ok, err = ngx_timer_at(consul_server.fetch_interval,
+                                     _M.connect, reg, consul_server, 
retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
     end
 end
 
@@ -365,10 +377,13 @@ function _M.create_registry(conf, options)
         skip_map[v] = true
     end
 
-    -- build default_service
+    -- clone default_service to avoid mutating the caller's table
     local default_svc
     if conf.default_service then
-        default_svc = conf.default_service
+        default_svc = {}
+        for k, v in pairs(conf.default_service) do
+            default_svc[k] = v
+        end
         default_svc.weight = conf.weight
     end
 
@@ -421,10 +436,6 @@ function _M.start_registry(reg)
         if not ok then
             error("create consul got error: " .. timer_err)
         end
-
-        if server.keepalive == false then
-            ngx_timer_every(server.fetch_interval, _M.connect, reg, server)
-        end
     end
 end
 
@@ -591,13 +602,6 @@ function _M.init_worker()
     end
     consul_dict = dict
 
-    if process.type() ~= "privileged agent" then
-        return
-    end
-
-    -- flush stale data that may persist across reloads
-    dict:flush_all()
-
     log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
 
     -- shallow copy to avoid mutating cached config
@@ -607,7 +611,23 @@ function _M.init_worker()
     end
     conf.id = "default"
 
+    -- create default registry on all workers so nodes()/control_api() work
     local reg = _M.create_registry(conf)
+
+    -- only the privileged agent runs timers / writes to shared dict
+    if process.type() ~= "privileged agent" then
+        return
+    end
+
+    -- flush stale data for the default registry that may persist across 
reloads
+    local prefix = "default/"
+    local all_keys = dict:get_keys(0)
+    for _, key in ipairs(all_keys) do
+        if core.string.has_prefix(key, prefix) then
+            dict:delete(key)
+        end
+    end
+
     _M.start_registry(reg)
 end
 
@@ -619,12 +639,12 @@ end
 
 
 function _M.control_api()
-    local default_reg = registries["default"]
     return {
         {
             methods = {"GET"},
             uris = {"/show_dump_file"},
             handler = function()
+                local default_reg = registries["default"]
                 return show_dump_file(default_reg)
             end,
         }

Reply via email to