Copilot commented on code in PR #13066:
URL: https://github.com/apache/apisix/pull/13066#discussion_r2894958250
##########
apisix/discovery/consul/init.lua:
##########
@@ -66,53 +73,73 @@ local _M = {
}
-local function discovery_consul_callback(data, event, source, pid)
- all_services = data
- log.notice("update local variable all_services, event is: ", event,
- "source: ", source, "server pid:", pid,
- ", all services: ", json_delay_encode(all_services, true))
-end
-
-
function _M.all_nodes()
- return all_services
+ local keys = consul_dict:get_keys(0)
+ local services = core.table.new(0, #keys)
+ for _, key in ipairs(keys) do
+ local value = consul_dict:get(key)
+ if value then
+ local nodes, err = core.json.decode(value)
+ if nodes then
+ services[key] = nodes
+ else
+ log.error("failed to decode nodes for service: ", key, ",
error: ", err)
+ end
+ end
+ end
+ return services
end
function _M.nodes(service_name)
- if not all_services then
- log.error("all_services is nil, failed to fetch nodes for : ",
service_name)
- return
+ local value = consul_dict:get(service_name)
+ if not value then
Review Comment:
`nodes()` uses `consul_dict:get(service_name)`. During reloads or refresh
patterns that use `flush_all()`, `get()` returns nil even though stale values
may still exist, causing avoidable 503s. Consider using `get_stale()` (and only
falling back to default when both fresh+stale are missing). Also consider
clearing `nodes_cache[service_name]` when the dict has no value to avoid
retaining entries for removed services indefinitely.
```suggestion
if value == nil then
-- try to use stale value during reloads or after flush_all()
value = consul_dict:get_stale(service_name)
end
if value == nil then
-- no fresh or stale value in shared dict; clear per-worker cache
entry
nodes_cache[service_name] = nil
```
##########
apisix/discovery/consul/init.lua:
##########
@@ -611,35 +637,32 @@ end
function _M.init_worker()
local consul_conf = local_conf.discovery.consul
+ dump_params = consul_conf.dump
- if consul_conf.dump then
- local dump = consul_conf.dump
- dump_params = dump
-
- if dump.load_on_init then
- read_dump_services()
- end
- end
-
- events = require("apisix.events")
- events_list = events:event_list(
- "discovery_consul_update_all_services",
- "updating"
- )
-
- if 0 ~= ngx_worker_id() then
- events:register(discovery_consul_callback, events_list._source,
events_list.updating)
- return
- end
-
- log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
default_weight = consul_conf.weight
sort_type = consul_conf.sort_type
-- set default service, used when the server node cannot be found
if consul_conf.default_service then
default_service = consul_conf.default_service
default_service.weight = default_weight
end
+
+ if process.type() ~= "privileged agent" then
+ return
+ end
+
+ -- flush stale data that may persist across reloads,
+ -- since consul_services is re-initialized empty
+ consul_dict:flush_all()
Review Comment:
`consul_dict:flush_all()` runs in the privileged agent during init_worker,
which will clear the shared dict for *all* workers during an nginx reload.
Since `nodes()` reads via `consul_dict:get()`, in-flight requests handled by
old workers during a graceful reload can suddenly see missing services and
return 503s. Consider avoiding a global flush on reload (e.g., use a
generation/version keyspace and swap, or rebuild `consul_services` from
existing dict keys), or at minimum ensure readers can still access stale values
during refresh and free memory after flushing (see `flush_expired`).
```suggestion
-- flush expired stale data that may persist across reloads,
-- since consul_services is re-initialized empty but existing
-- unexpired shared dict entries may still be in use by workers
consul_dict:flush_expired()
```
##########
apisix/discovery/consul/init.lua:
##########
@@ -611,35 +637,32 @@ end
function _M.init_worker()
local consul_conf = local_conf.discovery.consul
+ dump_params = consul_conf.dump
- if consul_conf.dump then
- local dump = consul_conf.dump
- dump_params = dump
-
- if dump.load_on_init then
- read_dump_services()
- end
- end
-
- events = require("apisix.events")
- events_list = events:event_list(
- "discovery_consul_update_all_services",
- "updating"
- )
-
- if 0 ~= ngx_worker_id() then
- events:register(discovery_consul_callback, events_list._source,
events_list.updating)
- return
- end
-
- log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
default_weight = consul_conf.weight
sort_type = consul_conf.sort_type
-- set default service, used when the server node cannot be found
if consul_conf.default_service then
default_service = consul_conf.default_service
default_service.weight = default_weight
end
+
+ if process.type() ~= "privileged agent" then
+ return
+ end
+
+ -- flush stale data that may persist across reloads,
+ -- since consul_services is re-initialized empty
+ consul_dict:flush_all()
+
Review Comment:
This PR changes the cross-worker data sharing mechanism for Consul nodes to
rely on `lua_shared_dict`, but there isn’t a regression test validating the
restart/reload scenario from #12398 (e.g., repeated HUP reloads / restarts
while sending requests should not produce intermittent 503s once Consul has
data). Adding a Test::Nginx case similar to existing HUP-based tests would help
prevent regressions.
```suggestion
```
##########
apisix/discovery/consul/init.lua:
##########
@@ -149,14 +176,21 @@ local function read_dump_services()
return
end
- all_services = entity.services
- log.info("load dump file into memory success")
+ for k, v in pairs(entity.services) do
+ local content, json_err = core.json.encode(v)
+ if content then
+ consul_dict:set(k, content)
+ else
+ log.error("failed to encode dump service: ", k, ", error: ",
json_err)
+ end
+ end
+ log.info("load dump file into shared dict success")
end
local function write_dump_services()
local entity = {
- services = all_services,
+ services = _M.all_nodes(),
last_update = ngx.time(),
expire = dump_params.expire, -- later need handle it
Review Comment:
`write_dump_services()` now calls `_M.all_nodes()` which scans all shdict
keys and JSON-decodes every service on each dump write. If dump is enabled and
service count is high, this can add noticeable CPU overhead at each Consul
update. Consider maintaining a privileged-agent-only in-memory snapshot for
dump generation, or passing the updated snapshot into `write_dump_services` to
avoid full scans/decodes.
##########
apisix/discovery/consul/init.lua:
##########
@@ -66,53 +73,73 @@ local _M = {
}
-local function discovery_consul_callback(data, event, source, pid)
- all_services = data
- log.notice("update local variable all_services, event is: ", event,
- "source: ", source, "server pid:", pid,
- ", all services: ", json_delay_encode(all_services, true))
-end
-
-
function _M.all_nodes()
- return all_services
+ local keys = consul_dict:get_keys(0)
+ local services = core.table.new(0, #keys)
+ for _, key in ipairs(keys) do
+ local value = consul_dict:get(key)
+ if value then
+ local nodes, err = core.json.decode(value)
+ if nodes then
+ services[key] = nodes
+ else
+ log.error("failed to decode nodes for service: ", key, ",
error: ", err)
+ end
+ end
+ end
+ return services
end
function _M.nodes(service_name)
- if not all_services then
- log.error("all_services is nil, failed to fetch nodes for : ",
service_name)
- return
+ local value = consul_dict:get(service_name)
+ if not value then
+ log.error("consul service not found: ", service_name, ", return
default service")
+ return default_service and {default_service}
end
- local resp_list = all_services[service_name]
+ local cached = nodes_cache[service_name]
+ if cached and cached.raw == value then
+ return cached.nodes
+ end
- if not resp_list then
- log.error("fetch nodes failed by ", service_name, ", return default
service")
+ local nodes, err = core.json.decode(value)
+ if not nodes then
+ log.error("fetch nodes failed by ", service_name, ", error: ", err)
return default_service and {default_service}
end
- log.info("process id: ", ngx_worker_id(), ", all_services[", service_name,
"] = ",
- json_delay_encode(resp_list, true))
+ nodes_cache[service_name] = {raw = value, nodes = nodes}
+
+ log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+ json_delay_encode(nodes, true))
- return resp_list
+ return nodes
end
local function update_all_services(consul_server_url, up_services)
- -- clean old unused data
+ -- write new/updated values first so readers never see a missing service
+ for k, v in pairs(up_services) do
+ local content, err = core.json.encode(v)
+ if content then
+ consul_dict:set(k, content)
Review Comment:
`consul_dict:set(k, content)` ignores the return values. If the shared dict
is full, `set` can fail or evict existing entries, which would silently drop
nodes and lead to routing failures. Please check the return values
(ok/err/forcible) and log/handle failures (e.g., advise increasing
`lua_shared_dict consul` size).
```suggestion
local ok, set_err, forcible = consul_dict:set(k, content)
if not ok then
log.error("failed to set nodes for service: ", k, ", error:
", set_err,
", please consider increasing lua_shared_dict
consul size")
elseif forcible then
log.warn("consul shared dict is full, forcibly evicting
items while ",
"setting nodes for service: ", k,
", please consider increasing lua_shared_dict
consul size")
end
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]