Copilot commented on code in PR #13230:
URL: https://github.com/apache/apisix/pull/13230#discussion_r3090717641
##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
end
-local function get_retry_delay(retry_delay)
- if not retry_delay or retry_delay >= max_retry_time then
- retry_delay = 1
- else
- retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+ if reg.stop_flag then
+ return
end
- return retry_delay
+ if consul_server.keepalive and not exiting() then
+ local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server,
retry_delay)
+ if not ok then
+ log.error("create ngx_timer_at got error: ", err)
+ return
+ end
+ end
end
-local function get_opts(consul_server, is_catalog)
- local opts = {
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- default_args = {
- token = consul_server.token,
- }
- }
- if not consul_server.keepalive then
- return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+ if premature or reg.stop_flag then
+ return
end
- opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0;
unblocked by wait=0
+ local catalog_thread, spawn_catalog_err =
thread_spawn(consul_client.watch_catalog,
+ consul_server)
+ if not catalog_thread then
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- if is_catalog then
- opts.default_args.index = consul_server.catalog_index
- else
- opts.default_args.index = consul_server.health_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return opts
-end
-
-
-local function watch_catalog(consul_server)
- local client = resty_consul:new(get_opts(consul_server, true))
-
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_catalog_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status
~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local health_thread, err = thread_spawn(consul_client.watch_health,
consul_server)
+ if not health_thread then
+ thread_kill(catalog_thread)
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch health: ", err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- return watch_type_catalog, default_catalog_error_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- if consul_server.catalog_index > 0
- and consul_server.catalog_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
+ thread_kill(catalog_thread)
+ thread_kill(health_thread)
+ if not thread_wait_ok then
local random_delay = math_random(default_random_range)
- log.info("watch catalog has no change, re-watch consul after ",
random_delay, " seconds")
+ log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
+ random_delay, " seconds")
core_sleep(random_delay)
- goto RETRY
+
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+ if not consul_client.watch_result_is_valid(tonumber(watch_type),
+ tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
+ core_sleep(retry_delay)
+ check_keepalive(reg, consul_server, retry_delay)
+ return
+ end
-local function watch_health(consul_server)
- local client = resty_consul:new(get_opts(consul_server, false))
+ if reg.stop_flag then
+ return
+ end
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_health_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status ~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local up_services, fetch_err, new_catalog_index, new_health_index =
+ consul_client.fetch_services_from_server(consul_server, {
+ default_weight = reg.conf.weight,
+ sort_type = reg.conf.sort_type,
+ skip_service_map = reg.skip_service_map,
+ preserve_metadata = reg.preserve_metadata,
+ key_builder = reg.key_builder,
+ })
- return watch_type_health, default_health_error_index
- end
+ if fetch_err then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err: ", fetch_err,
+ ", retry connecting consul after ", retry_delay, " seconds")
+ core_sleep(retry_delay)
- if consul_server.health_index > 0
- and consul_server.health_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
- local random_delay = math_random(default_random_range)
- log.info("watch health has no change, re-watch consul after ",
random_delay, " seconds")
- core_sleep(random_delay)
- goto RETRY
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+ if reg.stop_flag then
+ return
+ end
+ -- only update if there are actual services (empty table means no index
change)
+ if next(up_services) then
+ update_all_services(reg, consul_server.consul_server_url, up_services)
-local function check_keepalive(consul_server, retry_delay)
- if consul_server.keepalive and not exiting() then
- local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
- if not ok then
- log.error("create ngx_timer_at got error: ", err)
- return
+ if reg.dump_params then
+ ngx_timer_at(0, write_dump_services, reg)
end
end
+
+ consul_client.update_index(consul_server, new_catalog_index,
new_health_index)
+ check_keepalive(reg, consul_server, retry_delay)
end
-local function update_index(consul_server, catalog_index, health_index)
- local c_index = 0
- local h_index = 0
- if catalog_index ~= nil then
- c_index = tonumber(catalog_index)
- end
+-- ─── Registry management API ──────────────────────────────────────────
- if health_index ~= nil then
- h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+--- keepalive, fetch_interval, sort_type, skip_services,
+--- dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+--- key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+ options = options or {}
+ local id = conf.id
+ if not id or id == "" then
+ return nil, "registry id is required"
end
- if c_index > 0 then
- consul_server.catalog_index = c_index
+ if registries[id] then
+ _M.stop_registry(id)
end
- if h_index > 0 then
- consul_server.health_index = h_index
+ -- build skip service map
+ local skip_map = core.table.new(0, 1)
+ if conf.skip_services then
+ for _, v in ipairs(conf.skip_services) do
+ skip_map[v] = true
+ end
end
+ for _, v in ipairs(default_skip_services) do
+ skip_map[v] = true
+ end
+
+ -- build default_service
+ local default_svc
+ if conf.default_service then
+ default_svc = conf.default_service
Review Comment:
`init_worker()` does a shallow copy of `consul_conf` “to avoid mutating
cached config”, but `create_registry()` later mutates
`conf.default_service.weight` (and `conf.default_service` is still the same
nested table as in `local_conf`). If the intention is to avoid mutating
`local_conf`, clone `default_service` before setting `weight` (or update the
comment to match the actual behavior).
```suggestion
default_svc = {}
for k, v in pairs(conf.default_service) do
default_svc[k] = v
end
```
##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
end
-local function get_retry_delay(retry_delay)
- if not retry_delay or retry_delay >= max_retry_time then
- retry_delay = 1
- else
- retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+ if reg.stop_flag then
+ return
end
- return retry_delay
+ if consul_server.keepalive and not exiting() then
+ local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server,
retry_delay)
+ if not ok then
+ log.error("create ngx_timer_at got error: ", err)
+ return
+ end
+ end
end
-local function get_opts(consul_server, is_catalog)
- local opts = {
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- default_args = {
- token = consul_server.token,
- }
- }
- if not consul_server.keepalive then
- return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+ if premature or reg.stop_flag then
+ return
end
- opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0;
unblocked by wait=0
+ local catalog_thread, spawn_catalog_err =
thread_spawn(consul_client.watch_catalog,
+ consul_server)
+ if not catalog_thread then
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- if is_catalog then
- opts.default_args.index = consul_server.catalog_index
- else
- opts.default_args.index = consul_server.health_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return opts
-end
-
-
-local function watch_catalog(consul_server)
- local client = resty_consul:new(get_opts(consul_server, true))
-
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_catalog_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status
~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local health_thread, err = thread_spawn(consul_client.watch_health,
consul_server)
+ if not health_thread then
+ thread_kill(catalog_thread)
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch health: ", err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- return watch_type_catalog, default_catalog_error_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- if consul_server.catalog_index > 0
- and consul_server.catalog_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
+ thread_kill(catalog_thread)
+ thread_kill(health_thread)
+ if not thread_wait_ok then
local random_delay = math_random(default_random_range)
- log.info("watch catalog has no change, re-watch consul after ",
random_delay, " seconds")
+ log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
+ random_delay, " seconds")
core_sleep(random_delay)
- goto RETRY
+
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+ if not consul_client.watch_result_is_valid(tonumber(watch_type),
+ tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
+ core_sleep(retry_delay)
+ check_keepalive(reg, consul_server, retry_delay)
+ return
+ end
-local function watch_health(consul_server)
- local client = resty_consul:new(get_opts(consul_server, false))
+ if reg.stop_flag then
+ return
+ end
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_health_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status ~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local up_services, fetch_err, new_catalog_index, new_health_index =
+ consul_client.fetch_services_from_server(consul_server, {
+ default_weight = reg.conf.weight,
+ sort_type = reg.conf.sort_type,
+ skip_service_map = reg.skip_service_map,
+ preserve_metadata = reg.preserve_metadata,
+ key_builder = reg.key_builder,
+ })
- return watch_type_health, default_health_error_index
- end
+ if fetch_err then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err: ", fetch_err,
+ ", retry connecting consul after ", retry_delay, " seconds")
+ core_sleep(retry_delay)
- if consul_server.health_index > 0
- and consul_server.health_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
- local random_delay = math_random(default_random_range)
- log.info("watch health has no change, re-watch consul after ",
random_delay, " seconds")
- core_sleep(random_delay)
- goto RETRY
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+ if reg.stop_flag then
+ return
+ end
+ -- only update if there are actual services (empty table means no index
change)
+ if next(up_services) then
+ update_all_services(reg, consul_server.consul_server_url, up_services)
-local function check_keepalive(consul_server, retry_delay)
- if consul_server.keepalive and not exiting() then
- local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
- if not ok then
- log.error("create ngx_timer_at got error: ", err)
- return
+ if reg.dump_params then
+ ngx_timer_at(0, write_dump_services, reg)
end
end
+
+ consul_client.update_index(consul_server, new_catalog_index,
new_health_index)
+ check_keepalive(reg, consul_server, retry_delay)
end
-local function update_index(consul_server, catalog_index, health_index)
- local c_index = 0
- local h_index = 0
- if catalog_index ~= nil then
- c_index = tonumber(catalog_index)
- end
+-- ─── Registry management API ──────────────────────────────────────────
- if health_index ~= nil then
- h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+--- keepalive, fetch_interval, sort_type, skip_services,
+--- dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+--- key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+ options = options or {}
+ local id = conf.id
+ if not id or id == "" then
+ return nil, "registry id is required"
end
- if c_index > 0 then
- consul_server.catalog_index = c_index
+ if registries[id] then
+ _M.stop_registry(id)
end
- if h_index > 0 then
- consul_server.health_index = h_index
+ -- build skip service map
+ local skip_map = core.table.new(0, 1)
+ if conf.skip_services then
+ for _, v in ipairs(conf.skip_services) do
+ skip_map[v] = true
+ end
end
+ for _, v in ipairs(default_skip_services) do
+ skip_map[v] = true
+ end
+
+ -- build default_service
+ local default_svc
+ if conf.default_service then
+ default_svc = conf.default_service
+ default_svc.weight = conf.weight
+ end
+
+ local reg = {
+ id = id,
+ conf = conf,
+ stop_flag = false,
+ preserve_metadata = options.preserve_metadata or false,
+ key_builder = options.key_builder or default_key_builder(id),
+ service_scanner = options.service_scanner or
consul_client.get_consul_services,
+ skip_service_map = skip_map,
+ default_service = default_svc,
+ dump_params = conf.dump,
+ consul_services = core.table.new(0, 1),
+ }
+
+ registries[id] = reg
+ return reg
end
-local function is_not_empty(value)
- if value == nil or value == null
- or (type(value) == "table" and not next(value))
- or (type(value) == "string" and value == "")
- then
- return false
+function _M.start_registry(reg)
+ local dict = get_dict()
+ if not dict then
+ error('lua_shared_dict "' .. dict_name .. '" not configured')
end
- return true
-end
+ -- flush stale data for this registry
+ local prefix = reg.id .. "/"
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
+ end
+ -- load dump file if configured
+ if reg.dump_params and reg.dump_params.load_on_init then
+ read_dump_services(reg)
+ end
-local function watch_result_is_valid(watch_type, index, catalog_index,
health_index)
- if index <= 0 then
- return false
+ local consul_servers_list, err =
consul_client.format_consul_params(reg.conf)
+ if err then
+ error("format consul config got error: " .. err)
end
+ log.info("consul_server_list: ", json_delay_encode(consul_servers_list,
true))
- if watch_type == watch_type_catalog then
- if index == catalog_index then
- return false
+ for _, server in ipairs(consul_servers_list) do
+ local ok, timer_err = ngx_timer_at(0, _M.connect, reg, server)
+ if not ok then
+ error("create consul got error: " .. timer_err)
end
- else
- if index == health_index then
- return false
+
+ if server.keepalive == false then
+ ngx_timer_every(server.fetch_interval, _M.connect, reg, server)
end
end
-
- return true
end
-local function combine_sort_nodes_cmp(left, right)
- if left.host ~= right.host then
- return left.host < right.host
+function _M.stop_registry(id)
+ local reg = registries[id]
+ if not reg then
+ return
end
- return left.port < right.port
-end
-
+ reg.stop_flag = true
+ registries[id] = nil
-local function port_sort_nodes_cmp(left, right)
- return left.port < right.port
+ local dict = get_dict()
+ if dict then
+ local prefix = id .. "/"
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
+ end
+ end
end
-local function host_sort_nodes_cmp(left, right)
- return left.host < right.host
+function _M.get_registry(id)
+ return registries[id]
end
-function _M.connect(premature, consul_server, retry_delay)
- if premature then
- return
- end
-
- local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog,
consul_server)
- if not catalog_thread then
- local random_delay = math_random(default_random_range)
- log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
- ", retry connecting consul after ", random_delay, " seconds")
- core_sleep(random_delay)
+-- ─── Shared helpers ──────────────────────────────────────────────────
- check_keepalive(consul_server, retry_delay)
- return
+local function match_metadata(node_metadata, upstream_metadata)
+ if upstream_metadata == nil then
+ return true
end
- local health_thread, err = thread_spawn(watch_health, consul_server)
- if not health_thread then
- thread_kill(catalog_thread)
- local random_delay = math_random(default_random_range)
- log.error("failed to spawn thread watch health: ", err, ", retry
connecting consul after ",
- random_delay, " seconds")
- core_sleep(random_delay)
-
- check_keepalive(consul_server, retry_delay)
- return
+ if not node_metadata then
+ node_metadata = {}
end
- local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
- thread_kill(catalog_thread)
- thread_kill(health_thread)
- if not thread_wait_ok then
- local random_delay = math_random(default_random_range)
- log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
- random_delay, " seconds")
- core_sleep(random_delay)
-
- check_keepalive(consul_server, retry_delay)
- return
+ for k, v in pairs(upstream_metadata) do
+ if not node_metadata[k] or node_metadata[k] ~= v then
+ return false
+ end
end
- -- double check index has changed
- if not watch_result_is_valid(tonumber(watch_type),
- tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
+ return true
+end
- check_keepalive(consul_server, retry_delay)
- return
- end
- local consul_client = resty_consul:new({
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- default_args = {
- token = consul_server.token
- }
- })
- local catalog_success, catalog_res, catalog_err = pcall(function()
- return consul_client:get(consul_server.consul_watch_catalog_url)
- end)
- if not catalog_success then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got catalog result: ", json_delay_encode(catalog_res))
- check_keepalive(consul_server, retry_delay)
- return
+function _M.get_nodes(key, metadata)
+ local dict = get_dict()
+ if not dict then
+ return nil
end
- local catalog_error_info = (catalog_err ~= nil and catalog_err)
- or ((catalog_res ~= nil and catalog_res.status ~= 200)
- and catalog_res.status)
- if catalog_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got catalog result: ", json_delay_encode(catalog_res),
- ", with error: ", catalog_error_info)
-
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
- check_keepalive(consul_server, retry_delay)
- return
+ local value = dict:get(key)
+ if not value then
+ return nil
end
- -- get health index
- local success, health_res, health_err = pcall(function()
- return consul_client:get(consul_server.consul_watch_health_url)
- end)
- if not success then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got health result: ", json_delay_encode(health_res))
- check_keepalive(consul_server, retry_delay)
- return
+ local nodes = core.json.decode(value)
+ if not metadata then
+ return nodes
end
- local health_error_info = (health_err ~= nil and health_err)
- or ((health_res ~= nil and health_res.status ~= 200)
- and health_res.status)
- if health_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got health result: ", json_delay_encode(health_res),
- ", with error: ", health_error_info)
-
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
- check_keepalive(consul_server, retry_delay)
- return
+ local res = {}
+ for _, node in ipairs(nodes) do
+ if match_metadata(node.metadata, metadata) then
+ core.table.insert(res, node)
+ end
end
+ return res
+end
- log.info("connect consul: ", consul_server.consul_server_url,
- ", catalog_result status: ", catalog_res.status,
- ", catalog_result.headers.index: ",
catalog_res.headers['X-Consul-Index'],
- ", consul_server.index: ", consul_server.index,
- ", consul_server: ", json_delay_encode(consul_server))
-
- -- if the current index is different from the last index, then update the
service
- if (consul_server.catalog_index ~=
tonumber(catalog_res.headers['X-Consul-Index']))
- or (consul_server.health_index ~=
tonumber(health_res.headers['X-Consul-Index'])) then
- local up_services = core.table.new(0, #catalog_res.body)
- for service_name, _ in pairs(catalog_res.body) do
- -- check if the service_name is 'skip service'
- if skip_service_map[service_name] then
- goto CONTINUE
- end
- -- get node from service
- local svc_url = consul_server.consul_sub_url .. "/" .. service_name
- local svc_success, result, get_err = pcall(function()
- return consul_client:get(svc_url, {passing = true})
- end)
- local error_info = (get_err ~= nil and get_err) or
- ((result ~= nil and result.status ~= 200) and
result.status)
- if not svc_success or error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- ", by service url: ", svc_url, ", with error: ",
error_info)
- goto CONTINUE
- end
+-- ─── Standard discovery interface ─────────────────────────────────────
- -- decode body, decode json, update service, error handling
- -- check result body is not nil and not empty
- if is_not_empty(result.body) then
- -- add services to table
- local nodes = up_services[service_name]
- local nodes_uniq = {}
- for _, node in ipairs(result.body) do
- if not node.Service then
- goto CONTINUE
- end
-
- local svc_address, svc_port = node.Service.Address,
node.Service.Port
- -- Handle nil or 0 port case - default to 80 for HTTP
services
- if not svc_port or svc_port == 0 then
- svc_port = 80
- end
- -- if nodes is nil, new nodes table and set to up_services
- if not nodes then
- nodes = core.table.new(1, 0)
- up_services[service_name] = nodes
- end
- -- not store duplicate service IDs.
- local service_id = svc_address .. ":" .. svc_port
- if not nodes_uniq[service_id] then
- -- add node to nodes table
- core.table.insert(nodes, {
- host = svc_address,
- port = tonumber(svc_port),
- weight = default_weight,
- })
- nodes_uniq[service_id] = true
- end
- end
- if nodes then
- if sort_type == "port_sort" then
- core.table.sort(nodes, port_sort_nodes_cmp)
+local function fetch_node_from_shdict(service_name)
+ local dict = get_dict()
+ if not dict then
+ return nil, "consul shared dict not available"
+ end
- elseif sort_type == "host_sort" then
- core.table.sort(nodes, host_sort_nodes_cmp)
+ -- look up with the default registry prefix
+ local value = dict:get("default/" .. service_name)
+ if not value then
+ return nil, "consul service not found: " .. service_name
+ end
- elseif sort_type == "combine_sort" then
- core.table.sort(nodes, combine_sort_nodes_cmp)
+ local nodes, err = core.json.decode(value)
+ if not nodes then
+ return nil, "failed to decode nodes for service: "
+ .. service_name .. ", error: " .. (err or "")
+ end
- end
- end
- up_services[service_name] = nodes
- end
- :: CONTINUE ::
- end
+ return nodes
+end
- update_all_services(consul_server.consul_server_url, up_services)
- if dump_params then
- ngx_timer_at(0, write_dump_services)
- end
+function _M.nodes(service_name)
+ local default_reg = registries["default"]
+ local default_svc = default_reg and default_reg.default_service
- update_index(consul_server,
- catalog_res.headers['X-Consul-Index'],
- health_res.headers['X-Consul-Index'])
+ local nodes, err = nodes_cache(service_name, nil,
+ fetch_node_from_shdict, service_name)
+ if not nodes then
+ log.error("fetch nodes failed by ", service_name, ", error: ", err)
+ return default_svc and {default_svc}
end
- check_keepalive(consul_server, retry_delay)
+ log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+ json_delay_encode(nodes, true))
+
+ return nodes
end
-local function format_consul_params(consul_conf)
- local consul_server_list = core.table.new(0, #consul_conf.servers)
+function _M.all_nodes()
+ local dict = get_dict()
+ if not dict then
+ return {}
+ end
+
+ local keys = dict:get_keys(0)
+ local services = core.table.new(0, #keys)
+ local prefix = "default/"
+ for i, key in ipairs(keys) do
+ -- only return default registry nodes, strip prefix for BC
+ if core.string.has_prefix(key, prefix) then
+ local value = dict:get(key)
+ if value then
+ local nodes, err = core.json.decode(value)
+ if nodes then
+ local bare_key = key:sub(#prefix + 1)
+ services[bare_key] = nodes
+ else
+ log.error("failed to decode nodes for service: ", key, ",
error: ", err)
+ end
+ end
+ end
- for _, v in pairs(consul_conf.servers) do
- local scheme, host, port, path = unpack(http.parse_uri(nil, v))
- if scheme ~= "http" then
- return nil, "only support consul http schema address, eg:
http://address:port"
- elseif path ~= "/" or core.string.has_suffix(v, '/') then
- return nil, "invalid consul server address, the valid format:
http://address:port"
+ if i % 100 == 0 then
+ ngx.sleep(0)
end
- core.table.insert(consul_server_list, {
- host = host,
- port = port,
- token = consul_conf.token,
- connect_timeout = consul_conf.timeout.connect,
- read_timeout = consul_conf.timeout.read,
- wait_timeout = consul_conf.timeout.wait,
- consul_watch_catalog_url = "/catalog/services",
- consul_sub_url = "/health/service",
- consul_watch_health_url = "/health/state/any",
- consul_server_url = v .. "/v1",
- weight = consul_conf.weight,
- keepalive = consul_conf.keepalive,
- health_index = 0,
- catalog_index = 0,
- fetch_interval = consul_conf.fetch_interval -- fetch interval to
next connect consul
- })
end
- return consul_server_list, nil
+ return services
end
+-- ─── Initialization ───────────────────────────────────────────────────
+
function _M.init_worker()
- local consul_conf = local_conf.discovery.consul
- dump_params = consul_conf.dump
+ local consul_conf = local_conf.discovery and local_conf.discovery.consul
+ if not consul_conf then
+ return
+ end
- default_weight = consul_conf.weight
- sort_type = consul_conf.sort_type
- -- set default service, used when the server node cannot be found
- if consul_conf.default_service then
- default_service = consul_conf.default_service
- default_service.weight = default_weight
+ local dict = ngx.shared[dict_name]
+ if not dict then
+ error('lua_shared_dict "' .. dict_name .. '" not configured')
end
+ consul_dict = dict
if process.type() ~= "privileged agent" then
return
end
- -- flush stale data that may persist across reloads,
- -- since consul_services is re-initialized empty
- consul_dict:flush_all()
-
- if consul_conf.dump then
- if consul_conf.dump.load_on_init then
- read_dump_services()
- end
- end
+ -- flush stale data that may persist across reloads
+ dict:flush_all()
log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
- if consul_conf.skip_services then
- skip_service_map = core.table.new(0, #consul_conf.skip_services)
- for _, v in ipairs(consul_conf.skip_services) do
- skip_service_map[v] = true
- end
- end
- -- set up default skip service
- for _, v in ipairs(default_skip_services) do
- skip_service_map[v] = true
- end
-
- local consul_servers_list, err = format_consul_params(consul_conf)
- if err then
- error("format consul config got error: " .. err)
+ -- shallow copy to avoid mutating cached config
+ local conf = {}
+ for k, v in pairs(consul_conf) do
+ conf[k] = v
end
- log.info("consul_server_list: ", json_delay_encode(consul_servers_list,
true))
-
- consul_services = core.table.new(0, 1)
- -- success or failure
- for _, server in ipairs(consul_servers_list) do
- local ok, err = ngx_timer_at(0, _M.connect, server)
- if not ok then
- error("create consul got error: " .. err)
- end
+ conf.id = "default"
- if server.keepalive == false then
- ngx_timer_every(server.fetch_interval, _M.connect, server)
- end
- end
+ local reg = _M.create_registry(conf)
+ _M.start_registry(reg)
end
function _M.dump_data()
- return {config = local_conf.discovery.consul, services = _M.all_nodes()}
+ return {config = local_conf.discovery and local_conf.discovery.consul,
+ services = _M.all_nodes()}
end
function _M.control_api()
+ local default_reg = registries["default"]
return {
{
methods = {"GET"},
uris = {"/show_dump_file"},
- handler = show_dump_file,
+ handler = function()
+ return show_dump_file(default_reg)
+ end,
Review Comment:
`control_api()` captures `default_reg` at function execution time. If
`control_api()` is evaluated before `init_worker()` creates the registry, the
handler will permanently close over `nil` and always return “dump params is
nil”. Resolve the registry inside the handler (or call `get_registry()` there)
to avoid stale captures.
##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
end
-local function get_retry_delay(retry_delay)
- if not retry_delay or retry_delay >= max_retry_time then
- retry_delay = 1
- else
- retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+ if reg.stop_flag then
+ return
end
- return retry_delay
+ if consul_server.keepalive and not exiting() then
+ local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server,
retry_delay)
+ if not ok then
+ log.error("create ngx_timer_at got error: ", err)
+ return
+ end
+ end
end
-local function get_opts(consul_server, is_catalog)
- local opts = {
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- default_args = {
- token = consul_server.token,
- }
- }
- if not consul_server.keepalive then
- return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+ if premature or reg.stop_flag then
+ return
end
- opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0;
unblocked by wait=0
+ local catalog_thread, spawn_catalog_err =
thread_spawn(consul_client.watch_catalog,
+ consul_server)
+ if not catalog_thread then
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- if is_catalog then
- opts.default_args.index = consul_server.catalog_index
- else
- opts.default_args.index = consul_server.health_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return opts
-end
-
-
-local function watch_catalog(consul_server)
- local client = resty_consul:new(get_opts(consul_server, true))
-
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_catalog_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status
~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local health_thread, err = thread_spawn(consul_client.watch_health,
consul_server)
+ if not health_thread then
+ thread_kill(catalog_thread)
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch health: ", err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- return watch_type_catalog, default_catalog_error_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- if consul_server.catalog_index > 0
- and consul_server.catalog_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
+ thread_kill(catalog_thread)
+ thread_kill(health_thread)
+ if not thread_wait_ok then
local random_delay = math_random(default_random_range)
- log.info("watch catalog has no change, re-watch consul after ",
random_delay, " seconds")
+ log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
+ random_delay, " seconds")
core_sleep(random_delay)
- goto RETRY
+
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+ if not consul_client.watch_result_is_valid(tonumber(watch_type),
+ tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
+ core_sleep(retry_delay)
+ check_keepalive(reg, consul_server, retry_delay)
+ return
+ end
-local function watch_health(consul_server)
- local client = resty_consul:new(get_opts(consul_server, false))
+ if reg.stop_flag then
+ return
+ end
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_health_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status ~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local up_services, fetch_err, new_catalog_index, new_health_index =
+ consul_client.fetch_services_from_server(consul_server, {
+ default_weight = reg.conf.weight,
+ sort_type = reg.conf.sort_type,
+ skip_service_map = reg.skip_service_map,
+ preserve_metadata = reg.preserve_metadata,
+ key_builder = reg.key_builder,
+ })
- return watch_type_health, default_health_error_index
- end
+ if fetch_err then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err: ", fetch_err,
+ ", retry connecting consul after ", retry_delay, " seconds")
+ core_sleep(retry_delay)
- if consul_server.health_index > 0
- and consul_server.health_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
- local random_delay = math_random(default_random_range)
- log.info("watch health has no change, re-watch consul after ",
random_delay, " seconds")
- core_sleep(random_delay)
- goto RETRY
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+ if reg.stop_flag then
+ return
+ end
+ -- only update if there are actual services (empty table means no index
change)
+ if next(up_services) then
+ update_all_services(reg, consul_server.consul_server_url, up_services)
-local function check_keepalive(consul_server, retry_delay)
- if consul_server.keepalive and not exiting() then
- local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
- if not ok then
- log.error("create ngx_timer_at got error: ", err)
- return
+ if reg.dump_params then
+ ngx_timer_at(0, write_dump_services, reg)
end
end
+
+ consul_client.update_index(consul_server, new_catalog_index,
new_health_index)
+ check_keepalive(reg, consul_server, retry_delay)
end
-local function update_index(consul_server, catalog_index, health_index)
- local c_index = 0
- local h_index = 0
- if catalog_index ~= nil then
- c_index = tonumber(catalog_index)
- end
+-- ─── Registry management API ──────────────────────────────────────────
- if health_index ~= nil then
- h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+--- keepalive, fetch_interval, sort_type, skip_services,
+--- dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+--- key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+ options = options or {}
+ local id = conf.id
+ if not id or id == "" then
+ return nil, "registry id is required"
end
- if c_index > 0 then
- consul_server.catalog_index = c_index
+ if registries[id] then
+ _M.stop_registry(id)
end
- if h_index > 0 then
- consul_server.health_index = h_index
+ -- build skip service map
+ local skip_map = core.table.new(0, 1)
+ if conf.skip_services then
+ for _, v in ipairs(conf.skip_services) do
+ skip_map[v] = true
+ end
end
+ for _, v in ipairs(default_skip_services) do
+ skip_map[v] = true
+ end
+
+ -- build default_service
+ local default_svc
+ if conf.default_service then
+ default_svc = conf.default_service
+ default_svc.weight = conf.weight
+ end
+
+ local reg = {
+ id = id,
+ conf = conf,
+ stop_flag = false,
+ preserve_metadata = options.preserve_metadata or false,
+ key_builder = options.key_builder or default_key_builder(id),
+ service_scanner = options.service_scanner or
consul_client.get_consul_services,
+ skip_service_map = skip_map,
+ default_service = default_svc,
+ dump_params = conf.dump,
+ consul_services = core.table.new(0, 1),
+ }
+
+ registries[id] = reg
+ return reg
end
-local function is_not_empty(value)
- if value == nil or value == null
- or (type(value) == "table" and not next(value))
- or (type(value) == "string" and value == "")
- then
- return false
+function _M.start_registry(reg)
+ local dict = get_dict()
+ if not dict then
+ error('lua_shared_dict "' .. dict_name .. '" not configured')
end
- return true
-end
+ -- flush stale data for this registry
+ local prefix = reg.id .. "/"
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
+ end
+ -- load dump file if configured
+ if reg.dump_params and reg.dump_params.load_on_init then
+ read_dump_services(reg)
+ end
-local function watch_result_is_valid(watch_type, index, catalog_index,
health_index)
- if index <= 0 then
- return false
+ local consul_servers_list, err =
consul_client.format_consul_params(reg.conf)
+ if err then
+ error("format consul config got error: " .. err)
end
+ log.info("consul_server_list: ", json_delay_encode(consul_servers_list,
true))
- if watch_type == watch_type_catalog then
- if index == catalog_index then
- return false
+ for _, server in ipairs(consul_servers_list) do
+ local ok, timer_err = ngx_timer_at(0, _M.connect, reg, server)
+ if not ok then
+ error("create consul got error: " .. timer_err)
end
- else
- if index == health_index then
- return false
+
+ if server.keepalive == false then
+ ngx_timer_every(server.fetch_interval, _M.connect, reg, server)
end
end
-
- return true
end
-local function combine_sort_nodes_cmp(left, right)
- if left.host ~= right.host then
- return left.host < right.host
+function _M.stop_registry(id)
+ local reg = registries[id]
+ if not reg then
+ return
end
- return left.port < right.port
-end
-
+ reg.stop_flag = true
+ registries[id] = nil
-local function port_sort_nodes_cmp(left, right)
- return left.port < right.port
+ local dict = get_dict()
+ if dict then
+ local prefix = id .. "/"
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
+ end
+ end
end
-local function host_sort_nodes_cmp(left, right)
- return left.host < right.host
+function _M.get_registry(id)
+ return registries[id]
end
-function _M.connect(premature, consul_server, retry_delay)
- if premature then
- return
- end
-
- local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog,
consul_server)
- if not catalog_thread then
- local random_delay = math_random(default_random_range)
- log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
- ", retry connecting consul after ", random_delay, " seconds")
- core_sleep(random_delay)
+-- ─── Shared helpers ──────────────────────────────────────────────────
- check_keepalive(consul_server, retry_delay)
- return
+local function match_metadata(node_metadata, upstream_metadata)
+ if upstream_metadata == nil then
+ return true
end
- local health_thread, err = thread_spawn(watch_health, consul_server)
- if not health_thread then
- thread_kill(catalog_thread)
- local random_delay = math_random(default_random_range)
- log.error("failed to spawn thread watch health: ", err, ", retry
connecting consul after ",
- random_delay, " seconds")
- core_sleep(random_delay)
-
- check_keepalive(consul_server, retry_delay)
- return
+ if not node_metadata then
+ node_metadata = {}
end
- local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
- thread_kill(catalog_thread)
- thread_kill(health_thread)
- if not thread_wait_ok then
- local random_delay = math_random(default_random_range)
- log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
- random_delay, " seconds")
- core_sleep(random_delay)
-
- check_keepalive(consul_server, retry_delay)
- return
+ for k, v in pairs(upstream_metadata) do
+ if not node_metadata[k] or node_metadata[k] ~= v then
+ return false
+ end
end
- -- double check index has changed
- if not watch_result_is_valid(tonumber(watch_type),
- tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
+ return true
+end
- check_keepalive(consul_server, retry_delay)
- return
- end
- local consul_client = resty_consul:new({
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- default_args = {
- token = consul_server.token
- }
- })
- local catalog_success, catalog_res, catalog_err = pcall(function()
- return consul_client:get(consul_server.consul_watch_catalog_url)
- end)
- if not catalog_success then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got catalog result: ", json_delay_encode(catalog_res))
- check_keepalive(consul_server, retry_delay)
- return
+function _M.get_nodes(key, metadata)
+ local dict = get_dict()
+ if not dict then
+ return nil
end
- local catalog_error_info = (catalog_err ~= nil and catalog_err)
- or ((catalog_res ~= nil and catalog_res.status ~= 200)
- and catalog_res.status)
- if catalog_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got catalog result: ", json_delay_encode(catalog_res),
- ", with error: ", catalog_error_info)
-
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
- check_keepalive(consul_server, retry_delay)
- return
+ local value = dict:get(key)
+ if not value then
+ return nil
end
- -- get health index
- local success, health_res, health_err = pcall(function()
- return consul_client:get(consul_server.consul_watch_health_url)
- end)
- if not success then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got health result: ", json_delay_encode(health_res))
- check_keepalive(consul_server, retry_delay)
- return
+ local nodes = core.json.decode(value)
+ if not metadata then
+ return nodes
end
- local health_error_info = (health_err ~= nil and health_err)
- or ((health_res ~= nil and health_res.status ~= 200)
- and health_res.status)
- if health_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got health result: ", json_delay_encode(health_res),
- ", with error: ", health_error_info)
-
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
- check_keepalive(consul_server, retry_delay)
- return
+ local res = {}
+ for _, node in ipairs(nodes) do
+ if match_metadata(node.metadata, metadata) then
+ core.table.insert(res, node)
+ end
end
+ return res
+end
- log.info("connect consul: ", consul_server.consul_server_url,
- ", catalog_result status: ", catalog_res.status,
- ", catalog_result.headers.index: ",
catalog_res.headers['X-Consul-Index'],
- ", consul_server.index: ", consul_server.index,
- ", consul_server: ", json_delay_encode(consul_server))
-
- -- if the current index is different from the last index, then update the
service
- if (consul_server.catalog_index ~=
tonumber(catalog_res.headers['X-Consul-Index']))
- or (consul_server.health_index ~=
tonumber(health_res.headers['X-Consul-Index'])) then
- local up_services = core.table.new(0, #catalog_res.body)
- for service_name, _ in pairs(catalog_res.body) do
- -- check if the service_name is 'skip service'
- if skip_service_map[service_name] then
- goto CONTINUE
- end
- -- get node from service
- local svc_url = consul_server.consul_sub_url .. "/" .. service_name
- local svc_success, result, get_err = pcall(function()
- return consul_client:get(svc_url, {passing = true})
- end)
- local error_info = (get_err ~= nil and get_err) or
- ((result ~= nil and result.status ~= 200) and
result.status)
- if not svc_success or error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- ", by service url: ", svc_url, ", with error: ",
error_info)
- goto CONTINUE
- end
+-- ─── Standard discovery interface ─────────────────────────────────────
- -- decode body, decode json, update service, error handling
- -- check result body is not nil and not empty
- if is_not_empty(result.body) then
- -- add services to table
- local nodes = up_services[service_name]
- local nodes_uniq = {}
- for _, node in ipairs(result.body) do
- if not node.Service then
- goto CONTINUE
- end
-
- local svc_address, svc_port = node.Service.Address,
node.Service.Port
- -- Handle nil or 0 port case - default to 80 for HTTP
services
- if not svc_port or svc_port == 0 then
- svc_port = 80
- end
- -- if nodes is nil, new nodes table and set to up_services
- if not nodes then
- nodes = core.table.new(1, 0)
- up_services[service_name] = nodes
- end
- -- not store duplicate service IDs.
- local service_id = svc_address .. ":" .. svc_port
- if not nodes_uniq[service_id] then
- -- add node to nodes table
- core.table.insert(nodes, {
- host = svc_address,
- port = tonumber(svc_port),
- weight = default_weight,
- })
- nodes_uniq[service_id] = true
- end
- end
- if nodes then
- if sort_type == "port_sort" then
- core.table.sort(nodes, port_sort_nodes_cmp)
+local function fetch_node_from_shdict(service_name)
+ local dict = get_dict()
+ if not dict then
+ return nil, "consul shared dict not available"
+ end
- elseif sort_type == "host_sort" then
- core.table.sort(nodes, host_sort_nodes_cmp)
+ -- look up with the default registry prefix
+ local value = dict:get("default/" .. service_name)
+ if not value then
+ return nil, "consul service not found: " .. service_name
+ end
- elseif sort_type == "combine_sort" then
- core.table.sort(nodes, combine_sort_nodes_cmp)
+ local nodes, err = core.json.decode(value)
+ if not nodes then
+ return nil, "failed to decode nodes for service: "
+ .. service_name .. ", error: " .. (err or "")
+ end
- end
- end
- up_services[service_name] = nodes
- end
- :: CONTINUE ::
- end
+ return nodes
+end
- update_all_services(consul_server.consul_server_url, up_services)
- if dump_params then
- ngx_timer_at(0, write_dump_services)
- end
+function _M.nodes(service_name)
+ local default_reg = registries["default"]
+ local default_svc = default_reg and default_reg.default_service
- update_index(consul_server,
- catalog_res.headers['X-Consul-Index'],
- health_res.headers['X-Consul-Index'])
+ local nodes, err = nodes_cache(service_name, nil,
+ fetch_node_from_shdict, service_name)
+ if not nodes then
+ log.error("fetch nodes failed by ", service_name, ", error: ", err)
+ return default_svc and {default_svc}
end
- check_keepalive(consul_server, retry_delay)
+ log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+ json_delay_encode(nodes, true))
+
+ return nodes
end
-local function format_consul_params(consul_conf)
- local consul_server_list = core.table.new(0, #consul_conf.servers)
+function _M.all_nodes()
+ local dict = get_dict()
+ if not dict then
+ return {}
+ end
+
+ local keys = dict:get_keys(0)
+ local services = core.table.new(0, #keys)
+ local prefix = "default/"
+ for i, key in ipairs(keys) do
+ -- only return default registry nodes, strip prefix for BC
+ if core.string.has_prefix(key, prefix) then
+ local value = dict:get(key)
+ if value then
+ local nodes, err = core.json.decode(value)
+ if nodes then
+ local bare_key = key:sub(#prefix + 1)
+ services[bare_key] = nodes
+ else
+ log.error("failed to decode nodes for service: ", key, ",
error: ", err)
+ end
+ end
+ end
- for _, v in pairs(consul_conf.servers) do
- local scheme, host, port, path = unpack(http.parse_uri(nil, v))
- if scheme ~= "http" then
- return nil, "only support consul http schema address, eg:
http://address:port"
- elseif path ~= "/" or core.string.has_suffix(v, '/') then
- return nil, "invalid consul server address, the valid format:
http://address:port"
+ if i % 100 == 0 then
+ ngx.sleep(0)
end
- core.table.insert(consul_server_list, {
- host = host,
- port = port,
- token = consul_conf.token,
- connect_timeout = consul_conf.timeout.connect,
- read_timeout = consul_conf.timeout.read,
- wait_timeout = consul_conf.timeout.wait,
- consul_watch_catalog_url = "/catalog/services",
- consul_sub_url = "/health/service",
- consul_watch_health_url = "/health/state/any",
- consul_server_url = v .. "/v1",
- weight = consul_conf.weight,
- keepalive = consul_conf.keepalive,
- health_index = 0,
- catalog_index = 0,
- fetch_interval = consul_conf.fetch_interval -- fetch interval to
next connect consul
- })
end
- return consul_server_list, nil
+ return services
end
+-- ─── Initialization ───────────────────────────────────────────────────
+
function _M.init_worker()
- local consul_conf = local_conf.discovery.consul
- dump_params = consul_conf.dump
+ local consul_conf = local_conf.discovery and local_conf.discovery.consul
+ if not consul_conf then
+ return
+ end
- default_weight = consul_conf.weight
- sort_type = consul_conf.sort_type
- -- set default service, used when the server node cannot be found
- if consul_conf.default_service then
- default_service = consul_conf.default_service
- default_service.weight = default_weight
+ local dict = ngx.shared[dict_name]
+ if not dict then
+ error('lua_shared_dict "' .. dict_name .. '" not configured')
end
+ consul_dict = dict
if process.type() ~= "privileged agent" then
return
end
- -- flush stale data that may persist across reloads,
- -- since consul_services is re-initialized empty
- consul_dict:flush_all()
-
- if consul_conf.dump then
- if consul_conf.dump.load_on_init then
- read_dump_services()
- end
- end
+ -- flush stale data that may persist across reloads
+ dict:flush_all()
Review Comment:
`dict:flush_all()` wipes *all* Consul discovery keys, including other
registry IDs that this refactor is intended to isolate via `{id}/...` prefixes.
Consider deleting only keys with the default registry prefix (as
`start_registry()` already does) so a statically-configured default registry
won’t clobber other registries in the same shared dict.
```suggestion
-- flush stale data for the default registry that may persist across
reloads
local default_prefix = "default/"
for _, key in ipairs(dict:get_keys(0)) do
if key:find("^" .. default_prefix) then
dict:delete(key)
end
end
```
##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
end
-local function get_retry_delay(retry_delay)
- if not retry_delay or retry_delay >= max_retry_time then
- retry_delay = 1
- else
- retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+ if reg.stop_flag then
+ return
end
- return retry_delay
+ if consul_server.keepalive and not exiting() then
+ local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server,
retry_delay)
+ if not ok then
+ log.error("create ngx_timer_at got error: ", err)
+ return
+ end
+ end
end
-local function get_opts(consul_server, is_catalog)
- local opts = {
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- default_args = {
- token = consul_server.token,
- }
- }
- if not consul_server.keepalive then
- return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+ if premature or reg.stop_flag then
+ return
end
- opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0;
unblocked by wait=0
+ local catalog_thread, spawn_catalog_err =
thread_spawn(consul_client.watch_catalog,
+ consul_server)
+ if not catalog_thread then
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- if is_catalog then
- opts.default_args.index = consul_server.catalog_index
- else
- opts.default_args.index = consul_server.health_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return opts
-end
-
-
-local function watch_catalog(consul_server)
- local client = resty_consul:new(get_opts(consul_server, true))
-
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_catalog_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status
~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local health_thread, err = thread_spawn(consul_client.watch_health,
consul_server)
+ if not health_thread then
+ thread_kill(catalog_thread)
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch health: ", err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- return watch_type_catalog, default_catalog_error_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- if consul_server.catalog_index > 0
- and consul_server.catalog_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
+ thread_kill(catalog_thread)
+ thread_kill(health_thread)
+ if not thread_wait_ok then
local random_delay = math_random(default_random_range)
- log.info("watch catalog has no change, re-watch consul after ",
random_delay, " seconds")
+ log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
+ random_delay, " seconds")
core_sleep(random_delay)
- goto RETRY
+
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+ if not consul_client.watch_result_is_valid(tonumber(watch_type),
+ tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
+ core_sleep(retry_delay)
+ check_keepalive(reg, consul_server, retry_delay)
+ return
+ end
-local function watch_health(consul_server)
- local client = resty_consul:new(get_opts(consul_server, false))
+ if reg.stop_flag then
+ return
+ end
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_health_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status ~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local up_services, fetch_err, new_catalog_index, new_health_index =
+ consul_client.fetch_services_from_server(consul_server, {
+ default_weight = reg.conf.weight,
+ sort_type = reg.conf.sort_type,
+ skip_service_map = reg.skip_service_map,
+ preserve_metadata = reg.preserve_metadata,
+ key_builder = reg.key_builder,
+ })
- return watch_type_health, default_health_error_index
- end
+ if fetch_err then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err: ", fetch_err,
+ ", retry connecting consul after ", retry_delay, " seconds")
+ core_sleep(retry_delay)
- if consul_server.health_index > 0
- and consul_server.health_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
- local random_delay = math_random(default_random_range)
- log.info("watch health has no change, re-watch consul after ",
random_delay, " seconds")
- core_sleep(random_delay)
- goto RETRY
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+ if reg.stop_flag then
+ return
+ end
+ -- only update if there are actual services (empty table means no index
change)
+ if next(up_services) then
+ update_all_services(reg, consul_server.consul_server_url, up_services)
-local function check_keepalive(consul_server, retry_delay)
- if consul_server.keepalive and not exiting() then
- local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
- if not ok then
- log.error("create ngx_timer_at got error: ", err)
- return
+ if reg.dump_params then
+ ngx_timer_at(0, write_dump_services, reg)
end
end
+
+ consul_client.update_index(consul_server, new_catalog_index,
new_health_index)
+ check_keepalive(reg, consul_server, retry_delay)
end
-local function update_index(consul_server, catalog_index, health_index)
- local c_index = 0
- local h_index = 0
- if catalog_index ~= nil then
- c_index = tonumber(catalog_index)
- end
+-- ─── Registry management API ──────────────────────────────────────────
- if health_index ~= nil then
- h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+--- keepalive, fetch_interval, sort_type, skip_services,
+--- dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+--- key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+ options = options or {}
+ local id = conf.id
+ if not id or id == "" then
+ return nil, "registry id is required"
end
- if c_index > 0 then
- consul_server.catalog_index = c_index
+ if registries[id] then
+ _M.stop_registry(id)
end
- if h_index > 0 then
- consul_server.health_index = h_index
+ -- build skip service map
+ local skip_map = core.table.new(0, 1)
+ if conf.skip_services then
+ for _, v in ipairs(conf.skip_services) do
+ skip_map[v] = true
+ end
end
+ for _, v in ipairs(default_skip_services) do
+ skip_map[v] = true
+ end
+
+ -- build default_service
+ local default_svc
+ if conf.default_service then
+ default_svc = conf.default_service
+ default_svc.weight = conf.weight
+ end
+
+ local reg = {
+ id = id,
+ conf = conf,
+ stop_flag = false,
+ preserve_metadata = options.preserve_metadata or false,
+ key_builder = options.key_builder or default_key_builder(id),
+ service_scanner = options.service_scanner or
consul_client.get_consul_services,
+ skip_service_map = skip_map,
+ default_service = default_svc,
+ dump_params = conf.dump,
+ consul_services = core.table.new(0, 1),
+ }
+
+ registries[id] = reg
+ return reg
end
-local function is_not_empty(value)
- if value == nil or value == null
- or (type(value) == "table" and not next(value))
- or (type(value) == "string" and value == "")
- then
- return false
+function _M.start_registry(reg)
+ local dict = get_dict()
+ if not dict then
+ error('lua_shared_dict "' .. dict_name .. '" not configured')
end
- return true
-end
+ -- flush stale data for this registry
+ local prefix = reg.id .. "/"
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
+ end
+ -- load dump file if configured
+ if reg.dump_params and reg.dump_params.load_on_init then
+ read_dump_services(reg)
+ end
-local function watch_result_is_valid(watch_type, index, catalog_index,
health_index)
- if index <= 0 then
- return false
+ local consul_servers_list, err =
consul_client.format_consul_params(reg.conf)
+ if err then
+ error("format consul config got error: " .. err)
end
+ log.info("consul_server_list: ", json_delay_encode(consul_servers_list,
true))
- if watch_type == watch_type_catalog then
- if index == catalog_index then
- return false
+ for _, server in ipairs(consul_servers_list) do
+ local ok, timer_err = ngx_timer_at(0, _M.connect, reg, server)
+ if not ok then
+ error("create consul got error: " .. timer_err)
end
- else
- if index == health_index then
- return false
+
+ if server.keepalive == false then
+ ngx_timer_every(server.fetch_interval, _M.connect, reg, server)
end
end
-
- return true
end
-local function combine_sort_nodes_cmp(left, right)
- if left.host ~= right.host then
- return left.host < right.host
+function _M.stop_registry(id)
+ local reg = registries[id]
+ if not reg then
+ return
end
- return left.port < right.port
-end
-
+ reg.stop_flag = true
+ registries[id] = nil
-local function port_sort_nodes_cmp(left, right)
- return left.port < right.port
+ local dict = get_dict()
+ if dict then
+ local prefix = id .. "/"
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
+ end
+ end
end
-local function host_sort_nodes_cmp(left, right)
- return left.host < right.host
+function _M.get_registry(id)
+ return registries[id]
end
-function _M.connect(premature, consul_server, retry_delay)
- if premature then
- return
- end
-
- local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog,
consul_server)
- if not catalog_thread then
- local random_delay = math_random(default_random_range)
- log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
- ", retry connecting consul after ", random_delay, " seconds")
- core_sleep(random_delay)
+-- ─── Shared helpers ──────────────────────────────────────────────────
- check_keepalive(consul_server, retry_delay)
- return
+local function match_metadata(node_metadata, upstream_metadata)
+ if upstream_metadata == nil then
+ return true
end
- local health_thread, err = thread_spawn(watch_health, consul_server)
- if not health_thread then
- thread_kill(catalog_thread)
- local random_delay = math_random(default_random_range)
- log.error("failed to spawn thread watch health: ", err, ", retry
connecting consul after ",
- random_delay, " seconds")
- core_sleep(random_delay)
-
- check_keepalive(consul_server, retry_delay)
- return
+ if not node_metadata then
+ node_metadata = {}
end
- local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
- thread_kill(catalog_thread)
- thread_kill(health_thread)
- if not thread_wait_ok then
- local random_delay = math_random(default_random_range)
- log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
- random_delay, " seconds")
- core_sleep(random_delay)
-
- check_keepalive(consul_server, retry_delay)
- return
+ for k, v in pairs(upstream_metadata) do
+ if not node_metadata[k] or node_metadata[k] ~= v then
+ return false
+ end
end
- -- double check index has changed
- if not watch_result_is_valid(tonumber(watch_type),
- tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
+ return true
+end
- check_keepalive(consul_server, retry_delay)
- return
- end
- local consul_client = resty_consul:new({
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- default_args = {
- token = consul_server.token
- }
- })
- local catalog_success, catalog_res, catalog_err = pcall(function()
- return consul_client:get(consul_server.consul_watch_catalog_url)
- end)
- if not catalog_success then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got catalog result: ", json_delay_encode(catalog_res))
- check_keepalive(consul_server, retry_delay)
- return
+function _M.get_nodes(key, metadata)
+ local dict = get_dict()
+ if not dict then
+ return nil
end
- local catalog_error_info = (catalog_err ~= nil and catalog_err)
- or ((catalog_res ~= nil and catalog_res.status ~= 200)
- and catalog_res.status)
- if catalog_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got catalog result: ", json_delay_encode(catalog_res),
- ", with error: ", catalog_error_info)
-
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
- check_keepalive(consul_server, retry_delay)
- return
+ local value = dict:get(key)
+ if not value then
+ return nil
end
- -- get health index
- local success, health_res, health_err = pcall(function()
- return consul_client:get(consul_server.consul_watch_health_url)
- end)
- if not success then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got health result: ", json_delay_encode(health_res))
- check_keepalive(consul_server, retry_delay)
- return
+ local nodes = core.json.decode(value)
+ if not metadata then
+ return nodes
end
- local health_error_info = (health_err ~= nil and health_err)
- or ((health_res ~= nil and health_res.status ~= 200)
- and health_res.status)
- if health_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got health result: ", json_delay_encode(health_res),
- ", with error: ", health_error_info)
-
- retry_delay = get_retry_delay(retry_delay)
- log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
- core_sleep(retry_delay)
- check_keepalive(consul_server, retry_delay)
- return
+ local res = {}
+ for _, node in ipairs(nodes) do
+ if match_metadata(node.metadata, metadata) then
+ core.table.insert(res, node)
+ end
end
+ return res
+end
- log.info("connect consul: ", consul_server.consul_server_url,
- ", catalog_result status: ", catalog_res.status,
- ", catalog_result.headers.index: ",
catalog_res.headers['X-Consul-Index'],
- ", consul_server.index: ", consul_server.index,
- ", consul_server: ", json_delay_encode(consul_server))
-
- -- if the current index is different from the last index, then update the
service
- if (consul_server.catalog_index ~=
tonumber(catalog_res.headers['X-Consul-Index']))
- or (consul_server.health_index ~=
tonumber(health_res.headers['X-Consul-Index'])) then
- local up_services = core.table.new(0, #catalog_res.body)
- for service_name, _ in pairs(catalog_res.body) do
- -- check if the service_name is 'skip service'
- if skip_service_map[service_name] then
- goto CONTINUE
- end
- -- get node from service
- local svc_url = consul_server.consul_sub_url .. "/" .. service_name
- local svc_success, result, get_err = pcall(function()
- return consul_client:get(svc_url, {passing = true})
- end)
- local error_info = (get_err ~= nil and get_err) or
- ((result ~= nil and result.status ~= 200) and
result.status)
- if not svc_success or error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- ", by service url: ", svc_url, ", with error: ",
error_info)
- goto CONTINUE
- end
+-- ─── Standard discovery interface ─────────────────────────────────────
- -- decode body, decode json, update service, error handling
- -- check result body is not nil and not empty
- if is_not_empty(result.body) then
- -- add services to table
- local nodes = up_services[service_name]
- local nodes_uniq = {}
- for _, node in ipairs(result.body) do
- if not node.Service then
- goto CONTINUE
- end
-
- local svc_address, svc_port = node.Service.Address,
node.Service.Port
- -- Handle nil or 0 port case - default to 80 for HTTP
services
- if not svc_port or svc_port == 0 then
- svc_port = 80
- end
- -- if nodes is nil, new nodes table and set to up_services
- if not nodes then
- nodes = core.table.new(1, 0)
- up_services[service_name] = nodes
- end
- -- not store duplicate service IDs.
- local service_id = svc_address .. ":" .. svc_port
- if not nodes_uniq[service_id] then
- -- add node to nodes table
- core.table.insert(nodes, {
- host = svc_address,
- port = tonumber(svc_port),
- weight = default_weight,
- })
- nodes_uniq[service_id] = true
- end
- end
- if nodes then
- if sort_type == "port_sort" then
- core.table.sort(nodes, port_sort_nodes_cmp)
+local function fetch_node_from_shdict(service_name)
+ local dict = get_dict()
+ if not dict then
+ return nil, "consul shared dict not available"
+ end
- elseif sort_type == "host_sort" then
- core.table.sort(nodes, host_sort_nodes_cmp)
+ -- look up with the default registry prefix
+ local value = dict:get("default/" .. service_name)
+ if not value then
+ return nil, "consul service not found: " .. service_name
+ end
- elseif sort_type == "combine_sort" then
- core.table.sort(nodes, combine_sort_nodes_cmp)
+ local nodes, err = core.json.decode(value)
+ if not nodes then
+ return nil, "failed to decode nodes for service: "
+ .. service_name .. ", error: " .. (err or "")
+ end
- end
- end
- up_services[service_name] = nodes
- end
- :: CONTINUE ::
- end
+ return nodes
+end
- update_all_services(consul_server.consul_server_url, up_services)
- if dump_params then
- ngx_timer_at(0, write_dump_services)
- end
+function _M.nodes(service_name)
+ local default_reg = registries["default"]
+ local default_svc = default_reg and default_reg.default_service
- update_index(consul_server,
- catalog_res.headers['X-Consul-Index'],
- health_res.headers['X-Consul-Index'])
+ local nodes, err = nodes_cache(service_name, nil,
+ fetch_node_from_shdict, service_name)
+ if not nodes then
+ log.error("fetch nodes failed by ", service_name, ", error: ", err)
+ return default_svc and {default_svc}
end
- check_keepalive(consul_server, retry_delay)
+ log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+ json_delay_encode(nodes, true))
+
+ return nodes
end
-local function format_consul_params(consul_conf)
- local consul_server_list = core.table.new(0, #consul_conf.servers)
+function _M.all_nodes()
+ local dict = get_dict()
+ if not dict then
+ return {}
+ end
+
+ local keys = dict:get_keys(0)
+ local services = core.table.new(0, #keys)
+ local prefix = "default/"
+ for i, key in ipairs(keys) do
+ -- only return default registry nodes, strip prefix for BC
+ if core.string.has_prefix(key, prefix) then
+ local value = dict:get(key)
+ if value then
+ local nodes, err = core.json.decode(value)
+ if nodes then
+ local bare_key = key:sub(#prefix + 1)
+ services[bare_key] = nodes
+ else
+ log.error("failed to decode nodes for service: ", key, ",
error: ", err)
+ end
+ end
+ end
- for _, v in pairs(consul_conf.servers) do
- local scheme, host, port, path = unpack(http.parse_uri(nil, v))
- if scheme ~= "http" then
- return nil, "only support consul http schema address, eg:
http://address:port"
- elseif path ~= "/" or core.string.has_suffix(v, '/') then
- return nil, "invalid consul server address, the valid format:
http://address:port"
+ if i % 100 == 0 then
+ ngx.sleep(0)
end
- core.table.insert(consul_server_list, {
- host = host,
- port = port,
- token = consul_conf.token,
- connect_timeout = consul_conf.timeout.connect,
- read_timeout = consul_conf.timeout.read,
- wait_timeout = consul_conf.timeout.wait,
- consul_watch_catalog_url = "/catalog/services",
- consul_sub_url = "/health/service",
- consul_watch_health_url = "/health/state/any",
- consul_server_url = v .. "/v1",
- weight = consul_conf.weight,
- keepalive = consul_conf.keepalive,
- health_index = 0,
- catalog_index = 0,
- fetch_interval = consul_conf.fetch_interval -- fetch interval to
next connect consul
- })
end
- return consul_server_list, nil
+ return services
end
+-- ─── Initialization ───────────────────────────────────────────────────
+
function _M.init_worker()
- local consul_conf = local_conf.discovery.consul
- dump_params = consul_conf.dump
+ local consul_conf = local_conf.discovery and local_conf.discovery.consul
+ if not consul_conf then
+ return
+ end
- default_weight = consul_conf.weight
- sort_type = consul_conf.sort_type
- -- set default service, used when the server node cannot be found
- if consul_conf.default_service then
- default_service = consul_conf.default_service
- default_service.weight = default_weight
+ local dict = ngx.shared[dict_name]
+ if not dict then
+ error('lua_shared_dict "' .. dict_name .. '" not configured')
end
+ consul_dict = dict
if process.type() ~= "privileged agent" then
return
end
- -- flush stale data that may persist across reloads,
- -- since consul_services is re-initialized empty
- consul_dict:flush_all()
-
- if consul_conf.dump then
- if consul_conf.dump.load_on_init then
- read_dump_services()
- end
- end
+ -- flush stale data that may persist across reloads
+ dict:flush_all()
log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
- if consul_conf.skip_services then
- skip_service_map = core.table.new(0, #consul_conf.skip_services)
- for _, v in ipairs(consul_conf.skip_services) do
- skip_service_map[v] = true
- end
- end
- -- set up default skip service
- for _, v in ipairs(default_skip_services) do
- skip_service_map[v] = true
- end
-
- local consul_servers_list, err = format_consul_params(consul_conf)
- if err then
- error("format consul config got error: " .. err)
+ -- shallow copy to avoid mutating cached config
+ local conf = {}
+ for k, v in pairs(consul_conf) do
+ conf[k] = v
end
- log.info("consul_server_list: ", json_delay_encode(consul_servers_list,
true))
-
- consul_services = core.table.new(0, 1)
- -- success or failure
- for _, server in ipairs(consul_servers_list) do
- local ok, err = ngx_timer_at(0, _M.connect, server)
- if not ok then
- error("create consul got error: " .. err)
- end
+ conf.id = "default"
- if server.keepalive == false then
- ngx_timer_every(server.fetch_interval, _M.connect, server)
- end
- end
+ local reg = _M.create_registry(conf)
+ _M.start_registry(reg)
Review Comment:
`init_worker()` returns early on non-privileged workers before creating the
default registry. As a result, `registries["default"]` is nil in `nodes()` (so
the `default_service` fallback is lost) and in `control_api()`/dump handling.
To preserve the previous behavior, create (but don’t start) the default
registry on all workers, and only run `start_registry()` / timers in the
privileged agent.
##########
apisix/discovery/consul/client.lua:
##########
@@ -0,0 +1,479 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Reusable HTTP client primitives for Consul service discovery.
+--- Extracted from init.lua so that both static-config mode and
+--- dynamic-config mode can share the same core logic.
+
+local require = require
+local core = require("apisix.core")
+local core_sleep = require("apisix.core.utils").sleep
+local resty_consul = require('resty.consul')
+local http = require('resty.http')
+local ipairs = ipairs
+local pairs = pairs
+local unpack = unpack
+local tonumber = tonumber
+local type = type
+local next = next
+local ngx = ngx
+local math_random = math.random
+local log = core.log
+local json_delay_encode = core.json.delay_encode
+local pcall = pcall
+local null = ngx.null
+
+local _M = {}
+
+local default_random_range = 5
+local default_catalog_error_index = -1
+local default_health_error_index = -2
+local watch_type_catalog = 1
+local watch_type_health = 2
+local max_retry_time = 256
+
+
+-- ─── helpers ──────────────────────────────────────────────────────────
+
+function _M.get_retry_delay(retry_delay)
+ if not retry_delay or retry_delay >= max_retry_time then
+ retry_delay = 1
+ else
+ retry_delay = retry_delay * 4
+ end
+
+ return retry_delay
+end
+
+
+local function is_not_empty(value)
+ if value == nil or value == null
+ or (type(value) == "table" and not next(value))
+ or (type(value) == "string" and value == "")
+ then
+ return false
+ end
+
+ return true
+end
+
+
+-- ─── sort comparators ─────────────────────────────────────────────────
+
+local function combine_sort_nodes_cmp(left, right)
+ if left.host ~= right.host then
+ return left.host < right.host
+ end
+
+ return left.port < right.port
+end
+
+
+local function port_sort_nodes_cmp(left, right)
+ return left.port < right.port
+end
+
+
+local function host_sort_nodes_cmp(left, right)
+ return left.host < right.host
+end
+
+
+function _M.sort_nodes(nodes, sort_type)
+ if not nodes or not sort_type or sort_type == "origin" then
+ return
+ end
+
+ if sort_type == "port_sort" then
+ core.table.sort(nodes, port_sort_nodes_cmp)
+ elseif sort_type == "host_sort" then
+ core.table.sort(nodes, host_sort_nodes_cmp)
+ elseif sort_type == "combine_sort" then
+ core.table.sort(nodes, combine_sort_nodes_cmp)
+ end
+end
+
+
+-- ─── resty.consul options ─────────────────────────────────────────────
+
+local function get_opts(consul_server, is_catalog)
+ local opts = {
+ host = consul_server.host,
+ port = consul_server.port,
+ connect_timeout = consul_server.connect_timeout,
+ read_timeout = consul_server.read_timeout,
+ default_args = {
+ token = consul_server.token,
+ }
+ }
+ if not consul_server.keepalive then
+ return opts
+ end
+
+ opts.default_args.wait = consul_server.wait_timeout
+
+ if is_catalog then
+ opts.default_args.index = consul_server.catalog_index
+ else
+ opts.default_args.index = consul_server.health_index
+ end
+
+ return opts
+end
+
+
+-- ─── blocking query watchers ──────────────────────────────────────────
+
+function _M.watch_catalog(consul_server)
+ local client = resty_consul:new(get_opts(consul_server, true))
+
+ ::RETRY::
+ local watch_result, watch_err =
client:get(consul_server.consul_watch_catalog_url)
+ local watch_error_info = (watch_err ~= nil and watch_err)
+ or ((watch_result ~= nil and watch_result.status
~= 200)
+ and watch_result.status)
+ if watch_error_info then
+ log.error("connect consul: ", consul_server.consul_server_url,
+ " by sub url: ", consul_server.consul_watch_catalog_url,
+ ", got watch result: ", json_delay_encode(watch_result),
+ ", with error: ", watch_error_info)
+
+ return watch_type_catalog, default_catalog_error_index
+ end
+
+ if consul_server.catalog_index > 0
+ and consul_server.catalog_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local random_delay = math_random(default_random_range)
+ log.info("watch catalog has no change, re-watch consul after ",
random_delay, " seconds")
+ core_sleep(random_delay)
+ goto RETRY
+ end
+
+ return watch_type_catalog, watch_result.headers['X-Consul-Index']
+end
+
+
+function _M.watch_health(consul_server)
+ local client = resty_consul:new(get_opts(consul_server, false))
+
+ ::RETRY::
+ local watch_result, watch_err =
client:get(consul_server.consul_watch_health_url)
+ local watch_error_info = (watch_err ~= nil and watch_err)
+ or ((watch_result ~= nil and watch_result.status ~= 200)
+ and watch_result.status)
+ if watch_error_info then
+ log.error("connect consul: ", consul_server.consul_server_url,
+ " by sub url: ", consul_server.consul_watch_health_url,
+ ", got watch result: ", json_delay_encode(watch_result),
+ ", with error: ", watch_error_info)
+
+ return watch_type_health, default_health_error_index
+ end
+
+ if consul_server.health_index > 0
+ and consul_server.health_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local random_delay = math_random(default_random_range)
+ log.info("watch health has no change, re-watch consul after ",
random_delay, " seconds")
+ core_sleep(random_delay)
+ goto RETRY
+ end
+
+ return watch_type_health, watch_result.headers['X-Consul-Index']
+end
+
+
+function _M.watch_result_is_valid(watch_type, index, catalog_index,
health_index)
+ if index <= 0 then
+ return false
+ end
+
+ if watch_type == watch_type_catalog then
+ if index == catalog_index then
+ return false
+ end
+ else
+ if index == health_index then
+ return false
+ end
+ end
+
+ return true
+end
+
+
+function _M.update_index(consul_server, catalog_index, health_index)
+ local c_index = 0
+ local h_index = 0
+ if catalog_index ~= nil then
+ c_index = tonumber(catalog_index)
+ end
+
+ if health_index ~= nil then
+ h_index = tonumber(health_index)
+ end
+
+ if c_index > 0 then
+ consul_server.catalog_index = c_index
+ end
+
+ if h_index > 0 then
+ consul_server.health_index = h_index
+ end
+end
+
+
+-- ─── URL parsing ──────────────────────────────────────────────────────
+
+function _M.format_consul_params(consul_conf)
+ local servers = consul_conf.servers
+ local consul_server_list = core.table.new(0, #servers)
+
+ for _, v in pairs(servers) do
+ local scheme, host, port, path = unpack(http.parse_uri(nil, v))
+ if scheme ~= "http" then
+ return nil, "only support consul http schema address, eg:
http://address:port"
+ elseif path ~= "/" or core.string.has_suffix(v, '/') then
+ return nil, "invalid consul server address, the valid format:
http://address:port"
+ end
+ core.table.insert(consul_server_list, {
+ host = host,
+ port = port,
+ token = consul_conf.token,
+ connect_timeout = consul_conf.timeout.connect,
+ read_timeout = consul_conf.timeout.read,
+ wait_timeout = consul_conf.timeout.wait,
+ consul_watch_catalog_url = "/catalog/services",
+ consul_sub_url = "/health/service",
+ consul_watch_health_url = "/health/state/any",
+ consul_server_url = v .. "/v1",
+ weight = consul_conf.weight,
+ keepalive = consul_conf.keepalive,
+ health_index = 0,
+ catalog_index = 0,
+ fetch_interval = consul_conf.fetch_interval,
+ })
+ end
+ return consul_server_list, nil
+end
+
+
+-- ─── service fetching ─────────────────────────────────────────────────
+
+--- Fetch all services from a single consul server.
+--- Returns: up_services (table of service_name -> nodes), catalog_index,
health_index
+--- On failure: nil, err_string
+---
+--- options:
+--- default_weight (number) default node weight
+--- sort_type (string)
"origin"/"host_sort"/"port_sort"/"combine_sort"
+--- skip_service_map (table) set of service names to skip
+--- preserve_metadata (bool) include Service.Meta in returned nodes
+--- key_builder (function) key_builder(service_name) -> string for
the result key
+function _M.fetch_services_from_server(consul_server, options)
Review Comment:
Docstring for `fetch_services_from_server` doesn’t match the actual return
values. The function returns 4 values (`up_services`, `err`, `catalog_index`,
`health_index`), but the comment describes only 3 (and says the error is the
second return only on failure). Please update the comment to reflect the real
signature so callers implement it correctly.
##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
end
-local function get_retry_delay(retry_delay)
- if not retry_delay or retry_delay >= max_retry_time then
- retry_delay = 1
- else
- retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+ if reg.stop_flag then
+ return
end
- return retry_delay
+ if consul_server.keepalive and not exiting() then
+ local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server,
retry_delay)
+ if not ok then
+ log.error("create ngx_timer_at got error: ", err)
+ return
+ end
+ end
end
-local function get_opts(consul_server, is_catalog)
- local opts = {
- host = consul_server.host,
- port = consul_server.port,
- connect_timeout = consul_server.connect_timeout,
- read_timeout = consul_server.read_timeout,
- default_args = {
- token = consul_server.token,
- }
- }
- if not consul_server.keepalive then
- return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+ if premature or reg.stop_flag then
+ return
end
- opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0;
unblocked by wait=0
+ local catalog_thread, spawn_catalog_err =
thread_spawn(consul_client.watch_catalog,
+ consul_server)
+ if not catalog_thread then
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- if is_catalog then
- opts.default_args.index = consul_server.catalog_index
- else
- opts.default_args.index = consul_server.health_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return opts
-end
-
-
-local function watch_catalog(consul_server)
- local client = resty_consul:new(get_opts(consul_server, true))
-
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_catalog_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status
~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_catalog_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local health_thread, err = thread_spawn(consul_client.watch_health,
consul_server)
+ if not health_thread then
+ thread_kill(catalog_thread)
+ local random_delay = math_random(default_random_range)
+ log.error("failed to spawn thread watch health: ", err,
+ ", retry connecting consul after ", random_delay, " seconds")
+ core_sleep(random_delay)
- return watch_type_catalog, default_catalog_error_index
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- if consul_server.catalog_index > 0
- and consul_server.catalog_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
+ local thread_wait_ok, watch_type, index = thread_wait(catalog_thread,
health_thread)
+ thread_kill(catalog_thread)
+ thread_kill(health_thread)
+ if not thread_wait_ok then
local random_delay = math_random(default_random_range)
- log.info("watch catalog has no change, re-watch consul after ",
random_delay, " seconds")
+ log.error("failed to wait thread: ", watch_type, ", retry connecting
consul after ",
+ random_delay, " seconds")
core_sleep(random_delay)
- goto RETRY
+
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+ if not consul_client.watch_result_is_valid(tonumber(watch_type),
+ tonumber(index), consul_server.catalog_index,
consul_server.health_index) then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err, retry connecting consul after ",
retry_delay, " seconds")
+ core_sleep(retry_delay)
+ check_keepalive(reg, consul_server, retry_delay)
+ return
+ end
-local function watch_health(consul_server)
- local client = resty_consul:new(get_opts(consul_server, false))
+ if reg.stop_flag then
+ return
+ end
- ::RETRY::
- local watch_result, watch_err =
client:get(consul_server.consul_watch_health_url)
- local watch_error_info = (watch_err ~= nil and watch_err)
- or ((watch_result ~= nil and watch_result.status ~= 200)
- and watch_result.status)
- if watch_error_info then
- log.error("connect consul: ", consul_server.consul_server_url,
- " by sub url: ", consul_server.consul_watch_health_url,
- ", got watch result: ", json_delay_encode(watch_result),
- ", with error: ", watch_error_info)
+ local up_services, fetch_err, new_catalog_index, new_health_index =
+ consul_client.fetch_services_from_server(consul_server, {
+ default_weight = reg.conf.weight,
+ sort_type = reg.conf.sort_type,
+ skip_service_map = reg.skip_service_map,
+ preserve_metadata = reg.preserve_metadata,
+ key_builder = reg.key_builder,
+ })
- return watch_type_health, default_health_error_index
- end
+ if fetch_err then
+ retry_delay = consul_client.get_retry_delay(retry_delay)
+ log.warn("get all svcs got err: ", fetch_err,
+ ", retry connecting consul after ", retry_delay, " seconds")
+ core_sleep(retry_delay)
- if consul_server.health_index > 0
- and consul_server.health_index ==
tonumber(watch_result.headers['X-Consul-Index']) then
- local random_delay = math_random(default_random_range)
- log.info("watch health has no change, re-watch consul after ",
random_delay, " seconds")
- core_sleep(random_delay)
- goto RETRY
+ check_keepalive(reg, consul_server, retry_delay)
+ return
end
- return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+ if reg.stop_flag then
+ return
+ end
+ -- only update if there are actual services (empty table means no index
change)
+ if next(up_services) then
+ update_all_services(reg, consul_server.consul_server_url, up_services)
-local function check_keepalive(consul_server, retry_delay)
- if consul_server.keepalive and not exiting() then
- local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
- if not ok then
- log.error("create ngx_timer_at got error: ", err)
- return
+ if reg.dump_params then
+ ngx_timer_at(0, write_dump_services, reg)
end
end
+
+ consul_client.update_index(consul_server, new_catalog_index,
new_health_index)
+ check_keepalive(reg, consul_server, retry_delay)
end
-local function update_index(consul_server, catalog_index, health_index)
- local c_index = 0
- local h_index = 0
- if catalog_index ~= nil then
- c_index = tonumber(catalog_index)
- end
+-- ─── Registry management API ──────────────────────────────────────────
- if health_index ~= nil then
- h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+--- keepalive, fetch_interval, sort_type, skip_services,
+--- dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+--- key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+ options = options or {}
+ local id = conf.id
+ if not id or id == "" then
+ return nil, "registry id is required"
end
- if c_index > 0 then
- consul_server.catalog_index = c_index
+ if registries[id] then
+ _M.stop_registry(id)
end
- if h_index > 0 then
- consul_server.health_index = h_index
+ -- build skip service map
+ local skip_map = core.table.new(0, 1)
+ if conf.skip_services then
+ for _, v in ipairs(conf.skip_services) do
+ skip_map[v] = true
+ end
end
+ for _, v in ipairs(default_skip_services) do
+ skip_map[v] = true
+ end
+
+ -- build default_service
+ local default_svc
+ if conf.default_service then
+ default_svc = conf.default_service
+ default_svc.weight = conf.weight
+ end
+
+ local reg = {
+ id = id,
+ conf = conf,
+ stop_flag = false,
+ preserve_metadata = options.preserve_metadata or false,
+ key_builder = options.key_builder or default_key_builder(id),
+ service_scanner = options.service_scanner or
consul_client.get_consul_services,
+ skip_service_map = skip_map,
+ default_service = default_svc,
+ dump_params = conf.dump,
+ consul_services = core.table.new(0, 1),
+ }
+
+ registries[id] = reg
+ return reg
end
-local function is_not_empty(value)
- if value == nil or value == null
- or (type(value) == "table" and not next(value))
- or (type(value) == "string" and value == "")
- then
- return false
+function _M.start_registry(reg)
+ local dict = get_dict()
+ if not dict then
+ error('lua_shared_dict "' .. dict_name .. '" not configured')
end
- return true
-end
+ -- flush stale data for this registry
+ local prefix = reg.id .. "/"
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
+ end
+ -- load dump file if configured
+ if reg.dump_params and reg.dump_params.load_on_init then
+ read_dump_services(reg)
+ end
-local function watch_result_is_valid(watch_type, index, catalog_index,
health_index)
- if index <= 0 then
- return false
+ local consul_servers_list, err =
consul_client.format_consul_params(reg.conf)
+ if err then
+ error("format consul config got error: " .. err)
end
+ log.info("consul_server_list: ", json_delay_encode(consul_servers_list,
true))
- if watch_type == watch_type_catalog then
- if index == catalog_index then
- return false
+ for _, server in ipairs(consul_servers_list) do
+ local ok, timer_err = ngx_timer_at(0, _M.connect, reg, server)
+ if not ok then
+ error("create consul got error: " .. timer_err)
end
- else
- if index == health_index then
- return false
+
+ if server.keepalive == false then
+ ngx_timer_every(server.fetch_interval, _M.connect, reg, server)
end
Review Comment:
For `keepalive == false`, `start_registry()` uses `ngx.timer.every(...)`.
`stop_registry()` sets `reg.stop_flag`, but the repeating timer cannot be
cancelled, so it will keep waking up forever (even if `_M.connect` returns
immediately). Since a public stop API was added, consider switching to a
self-rescheduling `ngx.timer.at` loop (like nacos) so stop can actually halt
future wakeups.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]