This is an automated email from the ASF dual-hosted git repository. nic443 pushed a commit to branch feat/consul-registry-refactor in repository https://gitbox.apache.org/repos/asf/apisix.git
commit 197960929610e29d79aa39e9400481a5056e3eb0 Author: Nic <[email protected]> AuthorDate: Thu Apr 16 12:06:30 2026 +0800 fix: address review feedback on consul discovery refactoring - Clone default_service in create_registry to avoid mutating caller's table - Create default registry on all workers (not just privileged agent) so nodes()/control_api() work on non-privileged workers - Replace flush_all() with prefix-based flush to avoid clobbering other registries in the shared dict - Resolve default_reg inside control_api handler to avoid stale capture - Replace ngx.timer.every with self-rescheduling ngx.timer.at so stop_flag can actually halt future wakeups for non-keepalive servers - Remove unused ngx_timer_every import - Fix fetch_services_from_server docstring to match actual return values --- apisix/discovery/consul/client.lua | 7 +++-- apisix/discovery/consul/init.lua | 52 ++++++++++++++++++++++++++------------ 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/apisix/discovery/consul/client.lua b/apisix/discovery/consul/client.lua index bce968377..cfb46d343 100644 --- a/apisix/discovery/consul/client.lua +++ b/apisix/discovery/consul/client.lua @@ -274,8 +274,11 @@ end -- ─── service fetching ───────────────────────────────────────────────── --- Fetch all services from a single consul server. ---- Returns: up_services (table of service_name -> nodes), catalog_index, health_index ---- On failure: nil, err_string +--- Returns: up_services, err, catalog_index, health_index +--- up_services: table of key -> nodes (nil on failure) +--- err: error string (nil on success) +--- catalog_index: latest catalog index from consul +--- health_index: latest health index from consul --- --- options: --- default_weight (number) default node weight diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 640eff99b..a9b791bb9 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -26,7 +26,6 @@ local error = error local ngx = ngx local tonumber = tonumber local ngx_timer_at = ngx.timer.at -local ngx_timer_every = ngx.timer.every local log = core.log local json_delay_encode = core.json.delay_encode local process = require("ngx.process") @@ -230,12 +229,25 @@ local function check_keepalive(reg, consul_server, retry_delay) return end - if consul_server.keepalive and not exiting() then + if exiting() then + return + end + + if consul_server.keepalive then local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server, retry_delay) if not ok then log.error("create ngx_timer_at got error: ", err) return end + else + -- self-rescheduling poll: use timer.at instead of timer.every + -- so stop_flag can actually halt future wakeups + local ok, err = ngx_timer_at(consul_server.fetch_interval, + _M.connect, reg, consul_server, retry_delay) + if not ok then + log.error("create ngx_timer_at got error: ", err) + return + end end end @@ -365,10 +377,13 @@ function _M.create_registry(conf, options) skip_map[v] = true end - -- build default_service + -- clone default_service to avoid mutating the caller's table local default_svc if conf.default_service then - default_svc = conf.default_service + default_svc = {} + for k, v in pairs(conf.default_service) do + default_svc[k] = v + end default_svc.weight = conf.weight end @@ -421,10 +436,6 @@ function _M.start_registry(reg) if not ok then error("create consul got error: " .. timer_err) end - - if server.keepalive == false then - ngx_timer_every(server.fetch_interval, _M.connect, reg, server) - end end end @@ -591,13 +602,6 @@ function _M.init_worker() end consul_dict = dict - if process.type() ~= "privileged agent" then - return - end - - -- flush stale data that may persist across reloads - dict:flush_all() - log.notice("consul_conf: ", json_delay_encode(consul_conf, true)) -- shallow copy to avoid mutating cached config @@ -607,7 +611,23 @@ function _M.init_worker() end conf.id = "default" + -- create default registry on all workers so nodes()/control_api() work local reg = _M.create_registry(conf) + + -- only the privileged agent runs timers / writes to shared dict + if process.type() ~= "privileged agent" then + return + end + + -- flush stale data for the default registry that may persist across reloads + local prefix = "default/" + local all_keys = dict:get_keys(0) + for _, key in ipairs(all_keys) do + if core.string.has_prefix(key, prefix) then + dict:delete(key) + end + end + _M.start_registry(reg) end @@ -619,12 +639,12 @@ end function _M.control_api() - local default_reg = registries["default"] return { { methods = {"GET"}, uris = {"/show_dump_file"}, handler = function() + local default_reg = registries["default"] return show_dump_file(default_reg) end, }
