nic-6443 commented on code in PR #13230:
URL: https://github.com/apache/apisix/pull/13230#discussion_r3090726016


##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
 end
 
 
-local function get_retry_delay(retry_delay)
-    if not retry_delay or retry_delay >= max_retry_time then
-        retry_delay = 1
-    else
-        retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+    if reg.stop_flag then
+        return
     end
 
-    return retry_delay
+    if consul_server.keepalive and not exiting() then
+        local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server, 
retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
 end
 
 
-local function get_opts(consul_server, is_catalog)
-    local opts = {
-        host = consul_server.host,
-        port = consul_server.port,
-        connect_timeout = consul_server.connect_timeout,
-        read_timeout = consul_server.read_timeout,
-        default_args = {
-            token = consul_server.token,
-        }
-    }
-    if not consul_server.keepalive then
-        return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+    if premature or reg.stop_flag then
+        return
     end
 
-    opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0; 
unblocked by wait=0
+    local catalog_thread, spawn_catalog_err = 
thread_spawn(consul_client.watch_catalog,
+                                                           consul_server)
+    if not catalog_thread then
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
 
-    if is_catalog then
-        opts.default_args.index = consul_server.catalog_index
-    else
-        opts.default_args.index = consul_server.health_index
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return opts
-end
-
-
-local function watch_catalog(consul_server)
-    local client = resty_consul:new(get_opts(consul_server, true))
-
-    ::RETRY::
-    local watch_result, watch_err = 
client:get(consul_server.consul_watch_catalog_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-                             or ((watch_result ~= nil and watch_result.status 
~= 200)
-                             and watch_result.status)
-    if watch_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_catalog_url,
-            ", got watch result: ", json_delay_encode(watch_result),
-            ", with error: ", watch_error_info)
+    local health_thread, err = thread_spawn(consul_client.watch_health, 
consul_server)
+    if not health_thread then
+        thread_kill(catalog_thread)
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch health: ", err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
 
-        return watch_type_catalog, default_catalog_error_index
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    if consul_server.catalog_index > 0
-            and consul_server.catalog_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
+    local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, 
health_thread)
+    thread_kill(catalog_thread)
+    thread_kill(health_thread)
+    if not thread_wait_ok then
         local random_delay = math_random(default_random_range)
-        log.info("watch catalog has no change, re-watch consul after ", 
random_delay, " seconds")
+        log.error("failed to wait thread: ", watch_type, ", retry connecting 
consul after ",
+                random_delay, " seconds")
         core_sleep(random_delay)
-        goto RETRY
+
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+    if not consul_client.watch_result_is_valid(tonumber(watch_type),
+            tonumber(index), consul_server.catalog_index, 
consul_server.health_index) then
+        retry_delay = consul_client.get_retry_delay(retry_delay)
+        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
+        core_sleep(retry_delay)
 
+        check_keepalive(reg, consul_server, retry_delay)
+        return
+    end
 
-local function watch_health(consul_server)
-    local client = resty_consul:new(get_opts(consul_server, false))
+    if reg.stop_flag then
+        return
+    end
 
-    ::RETRY::
-    local watch_result, watch_err = 
client:get(consul_server.consul_watch_health_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-            or ((watch_result ~= nil and watch_result.status ~= 200)
-            and watch_result.status)
-    if watch_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_health_url,
-            ", got watch result: ", json_delay_encode(watch_result),
-            ", with error: ", watch_error_info)
+    local up_services, fetch_err, new_catalog_index, new_health_index =
+        consul_client.fetch_services_from_server(consul_server, {
+            default_weight    = reg.conf.weight,
+            sort_type         = reg.conf.sort_type,
+            skip_service_map  = reg.skip_service_map,
+            preserve_metadata = reg.preserve_metadata,
+            key_builder       = reg.key_builder,
+        })
 
-        return watch_type_health, default_health_error_index
-    end
+    if fetch_err then
+        retry_delay = consul_client.get_retry_delay(retry_delay)
+        log.warn("get all svcs got err: ", fetch_err,
+                 ", retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
 
-    if consul_server.health_index > 0
-            and consul_server.health_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
-        local random_delay = math_random(default_random_range)
-        log.info("watch health has no change, re-watch consul after ", 
random_delay, " seconds")
-        core_sleep(random_delay)
-        goto RETRY
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+    if reg.stop_flag then
+        return
+    end
 
+    -- only update if there are actual services (empty table means no index 
change)
+    if next(up_services) then
+        update_all_services(reg, consul_server.consul_server_url, up_services)
 
-local function check_keepalive(consul_server, retry_delay)
-    if consul_server.keepalive and not exiting() then
-        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
-        if not ok then
-            log.error("create ngx_timer_at got error: ", err)
-            return
+        if reg.dump_params then
+            ngx_timer_at(0, write_dump_services, reg)
         end
     end
+
+    consul_client.update_index(consul_server, new_catalog_index, 
new_health_index)
+    check_keepalive(reg, consul_server, retry_delay)
 end
 
 
-local function update_index(consul_server, catalog_index, health_index)
-    local c_index = 0
-    local h_index = 0
-    if catalog_index ~= nil then
-        c_index = tonumber(catalog_index)
-    end
+-- ─── Registry management API ──────────────────────────────────────────
 
-    if health_index ~= nil then
-        h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+---              keepalive, fetch_interval, sort_type, skip_services,
+---              dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+---          key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+    options = options or {}
+    local id = conf.id
+    if not id or id == "" then
+        return nil, "registry id is required"
     end
 
-    if c_index > 0 then
-        consul_server.catalog_index = c_index
+    if registries[id] then
+        _M.stop_registry(id)
     end
 
-    if h_index > 0 then
-        consul_server.health_index = h_index
+    -- build skip service map
+    local skip_map = core.table.new(0, 1)
+    if conf.skip_services then
+        for _, v in ipairs(conf.skip_services) do
+            skip_map[v] = true
+        end
     end
+    for _, v in ipairs(default_skip_services) do
+        skip_map[v] = true
+    end
+
+    -- build default_service
+    local default_svc
+    if conf.default_service then
+        default_svc = conf.default_service

Review Comment:
   Fixed — now cloning default_service with a shallow copy before setting 
weight.



##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
 end
 
 
-local function get_retry_delay(retry_delay)
-    if not retry_delay or retry_delay >= max_retry_time then
-        retry_delay = 1
-    else
-        retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+    if reg.stop_flag then
+        return
     end
 
-    return retry_delay
+    if consul_server.keepalive and not exiting() then
+        local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server, 
retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
 end
 
 
-local function get_opts(consul_server, is_catalog)
-    local opts = {
-        host = consul_server.host,
-        port = consul_server.port,
-        connect_timeout = consul_server.connect_timeout,
-        read_timeout = consul_server.read_timeout,
-        default_args = {
-            token = consul_server.token,
-        }
-    }
-    if not consul_server.keepalive then
-        return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+    if premature or reg.stop_flag then
+        return
     end
 
-    opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0; 
unblocked by wait=0
+    local catalog_thread, spawn_catalog_err = 
thread_spawn(consul_client.watch_catalog,
+                                                           consul_server)
+    if not catalog_thread then
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
 
-    if is_catalog then
-        opts.default_args.index = consul_server.catalog_index
-    else
-        opts.default_args.index = consul_server.health_index
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return opts
-end
-
-
-local function watch_catalog(consul_server)
-    local client = resty_consul:new(get_opts(consul_server, true))
-
-    ::RETRY::
-    local watch_result, watch_err = 
client:get(consul_server.consul_watch_catalog_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-                             or ((watch_result ~= nil and watch_result.status 
~= 200)
-                             and watch_result.status)
-    if watch_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_catalog_url,
-            ", got watch result: ", json_delay_encode(watch_result),
-            ", with error: ", watch_error_info)
+    local health_thread, err = thread_spawn(consul_client.watch_health, 
consul_server)
+    if not health_thread then
+        thread_kill(catalog_thread)
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch health: ", err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
 
-        return watch_type_catalog, default_catalog_error_index
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    if consul_server.catalog_index > 0
-            and consul_server.catalog_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
+    local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, 
health_thread)
+    thread_kill(catalog_thread)
+    thread_kill(health_thread)
+    if not thread_wait_ok then
         local random_delay = math_random(default_random_range)
-        log.info("watch catalog has no change, re-watch consul after ", 
random_delay, " seconds")
+        log.error("failed to wait thread: ", watch_type, ", retry connecting 
consul after ",
+                random_delay, " seconds")
         core_sleep(random_delay)
-        goto RETRY
+
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+    if not consul_client.watch_result_is_valid(tonumber(watch_type),
+            tonumber(index), consul_server.catalog_index, 
consul_server.health_index) then
+        retry_delay = consul_client.get_retry_delay(retry_delay)
+        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
+        core_sleep(retry_delay)
 
+        check_keepalive(reg, consul_server, retry_delay)
+        return
+    end
 
-local function watch_health(consul_server)
-    local client = resty_consul:new(get_opts(consul_server, false))
+    if reg.stop_flag then
+        return
+    end
 
-    ::RETRY::
-    local watch_result, watch_err = 
client:get(consul_server.consul_watch_health_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-            or ((watch_result ~= nil and watch_result.status ~= 200)
-            and watch_result.status)
-    if watch_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_health_url,
-            ", got watch result: ", json_delay_encode(watch_result),
-            ", with error: ", watch_error_info)
+    local up_services, fetch_err, new_catalog_index, new_health_index =
+        consul_client.fetch_services_from_server(consul_server, {
+            default_weight    = reg.conf.weight,
+            sort_type         = reg.conf.sort_type,
+            skip_service_map  = reg.skip_service_map,
+            preserve_metadata = reg.preserve_metadata,
+            key_builder       = reg.key_builder,
+        })
 
-        return watch_type_health, default_health_error_index
-    end
+    if fetch_err then
+        retry_delay = consul_client.get_retry_delay(retry_delay)
+        log.warn("get all svcs got err: ", fetch_err,
+                 ", retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
 
-    if consul_server.health_index > 0
-            and consul_server.health_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
-        local random_delay = math_random(default_random_range)
-        log.info("watch health has no change, re-watch consul after ", 
random_delay, " seconds")
-        core_sleep(random_delay)
-        goto RETRY
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+    if reg.stop_flag then
+        return
+    end
 
+    -- only update if there are actual services (empty table means no index 
change)
+    if next(up_services) then
+        update_all_services(reg, consul_server.consul_server_url, up_services)
 
-local function check_keepalive(consul_server, retry_delay)
-    if consul_server.keepalive and not exiting() then
-        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
-        if not ok then
-            log.error("create ngx_timer_at got error: ", err)
-            return
+        if reg.dump_params then
+            ngx_timer_at(0, write_dump_services, reg)
         end
     end
+
+    consul_client.update_index(consul_server, new_catalog_index, 
new_health_index)
+    check_keepalive(reg, consul_server, retry_delay)
 end
 
 
-local function update_index(consul_server, catalog_index, health_index)
-    local c_index = 0
-    local h_index = 0
-    if catalog_index ~= nil then
-        c_index = tonumber(catalog_index)
-    end
+-- ─── Registry management API ──────────────────────────────────────────
 
-    if health_index ~= nil then
-        h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+---              keepalive, fetch_interval, sort_type, skip_services,
+---              dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+---          key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+    options = options or {}
+    local id = conf.id
+    if not id or id == "" then
+        return nil, "registry id is required"
     end
 
-    if c_index > 0 then
-        consul_server.catalog_index = c_index
+    if registries[id] then
+        _M.stop_registry(id)
     end
 
-    if h_index > 0 then
-        consul_server.health_index = h_index
+    -- build skip service map
+    local skip_map = core.table.new(0, 1)
+    if conf.skip_services then
+        for _, v in ipairs(conf.skip_services) do
+            skip_map[v] = true
+        end
     end
+    for _, v in ipairs(default_skip_services) do
+        skip_map[v] = true
+    end
+
+    -- build default_service
+    local default_svc
+    if conf.default_service then
+        default_svc = conf.default_service
+        default_svc.weight = conf.weight
+    end
+
+    local reg = {
+        id                = id,
+        conf              = conf,
+        stop_flag         = false,
+        preserve_metadata = options.preserve_metadata or false,
+        key_builder       = options.key_builder or default_key_builder(id),
+        service_scanner   = options.service_scanner or 
consul_client.get_consul_services,
+        skip_service_map  = skip_map,
+        default_service   = default_svc,
+        dump_params       = conf.dump,
+        consul_services   = core.table.new(0, 1),
+    }
+
+    registries[id] = reg
+    return reg
 end
 
 
-local function is_not_empty(value)
-    if value == nil or value == null
-            or (type(value) == "table" and not next(value))
-            or (type(value) == "string" and value == "")
-    then
-        return false
+function _M.start_registry(reg)
+    local dict = get_dict()
+    if not dict then
+        error('lua_shared_dict "' .. dict_name .. '" not configured')
     end
 
-    return true
-end
+    -- flush stale data for this registry
+    local prefix = reg.id .. "/"
+    local all_keys = dict:get_keys(0)
+    for _, key in ipairs(all_keys) do
+        if core.string.has_prefix(key, prefix) then
+            dict:delete(key)
+        end
+    end
 
+    -- load dump file if configured
+    if reg.dump_params and reg.dump_params.load_on_init then
+        read_dump_services(reg)
+    end
 
-local function watch_result_is_valid(watch_type, index, catalog_index, 
health_index)
-    if index <= 0 then
-        return false
+    local consul_servers_list, err = 
consul_client.format_consul_params(reg.conf)
+    if err then
+        error("format consul config got error: " .. err)
     end
+    log.info("consul_server_list: ", json_delay_encode(consul_servers_list, 
true))
 
-    if watch_type == watch_type_catalog then
-        if index == catalog_index then
-            return false
+    for _, server in ipairs(consul_servers_list) do
+        local ok, timer_err = ngx_timer_at(0, _M.connect, reg, server)
+        if not ok then
+            error("create consul got error: " .. timer_err)
         end
-    else
-        if index == health_index then
-            return false
+
+        if server.keepalive == false then
+            ngx_timer_every(server.fetch_interval, _M.connect, reg, server)
         end
     end
-
-    return true
 end
 
 
-local function combine_sort_nodes_cmp(left, right)
-    if left.host ~= right.host then
-        return left.host < right.host
+function _M.stop_registry(id)
+    local reg = registries[id]
+    if not reg then
+        return
     end
 
-    return left.port < right.port
-end
-
+    reg.stop_flag = true
+    registries[id] = nil
 
-local function port_sort_nodes_cmp(left, right)
-    return left.port < right.port
+    local dict = get_dict()
+    if dict then
+        local prefix = id .. "/"
+        local all_keys = dict:get_keys(0)
+        for _, key in ipairs(all_keys) do
+            if core.string.has_prefix(key, prefix) then
+                dict:delete(key)
+            end
+        end
+    end
 end
 
 
-local function host_sort_nodes_cmp(left, right)
-    return left.host < right.host
+function _M.get_registry(id)
+    return registries[id]
 end
 
 
-function _M.connect(premature, consul_server, retry_delay)
-    if premature then
-        return
-    end
-
-    local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, 
consul_server)
-    if not catalog_thread then
-        local random_delay = math_random(default_random_range)
-        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
-            ", retry connecting consul after ", random_delay, " seconds")
-        core_sleep(random_delay)
+-- ─── Shared helpers ──────────────────────────────────────────────────
 
-        check_keepalive(consul_server, retry_delay)
-        return
+local function match_metadata(node_metadata, upstream_metadata)
+    if upstream_metadata == nil then
+        return true
     end
 
-    local health_thread, err = thread_spawn(watch_health, consul_server)
-    if not health_thread then
-        thread_kill(catalog_thread)
-        local random_delay = math_random(default_random_range)
-        log.error("failed to spawn thread watch health: ", err, ", retry 
connecting consul after ",
-            random_delay, " seconds")
-        core_sleep(random_delay)
-
-        check_keepalive(consul_server, retry_delay)
-        return
+    if not node_metadata then
+        node_metadata = {}
     end
 
-    local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, 
health_thread)
-    thread_kill(catalog_thread)
-    thread_kill(health_thread)
-    if not thread_wait_ok then
-        local random_delay = math_random(default_random_range)
-        log.error("failed to wait thread: ", watch_type, ", retry connecting 
consul after ",
-                random_delay, " seconds")
-        core_sleep(random_delay)
-
-        check_keepalive(consul_server, retry_delay)
-        return
+    for k, v in pairs(upstream_metadata) do
+        if not node_metadata[k] or node_metadata[k] ~= v then
+            return false
+        end
     end
 
-    -- double check index has changed
-    if not watch_result_is_valid(tonumber(watch_type),
-            tonumber(index), consul_server.catalog_index, 
consul_server.health_index) then
-        retry_delay = get_retry_delay(retry_delay)
-        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
-        core_sleep(retry_delay)
+    return true
+end
 
-        check_keepalive(consul_server, retry_delay)
-        return
-    end
 
-    local consul_client = resty_consul:new({
-        host = consul_server.host,
-        port = consul_server.port,
-        connect_timeout = consul_server.connect_timeout,
-        read_timeout = consul_server.read_timeout,
-        default_args = {
-            token = consul_server.token
-        }
-    })
-    local catalog_success, catalog_res, catalog_err = pcall(function()
-        return consul_client:get(consul_server.consul_watch_catalog_url)
-    end)
-    if not catalog_success then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_catalog_url,
-            ", got catalog result: ", json_delay_encode(catalog_res))
-        check_keepalive(consul_server, retry_delay)
-        return
+function _M.get_nodes(key, metadata)
+    local dict = get_dict()
+    if not dict then
+        return nil
     end
-    local catalog_error_info = (catalog_err ~= nil and catalog_err)
-            or ((catalog_res ~= nil and catalog_res.status ~= 200)
-            and catalog_res.status)
-    if catalog_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_catalog_url,
-            ", got catalog result: ", json_delay_encode(catalog_res),
-            ", with error: ", catalog_error_info)
-
-        retry_delay = get_retry_delay(retry_delay)
-        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
-        core_sleep(retry_delay)
 
-        check_keepalive(consul_server, retry_delay)
-        return
+    local value = dict:get(key)
+    if not value then
+        return nil
     end
 
-    -- get health index
-    local success, health_res, health_err = pcall(function()
-        return consul_client:get(consul_server.consul_watch_health_url)
-    end)
-    if not success then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_health_url,
-            ", got health result: ", json_delay_encode(health_res))
-        check_keepalive(consul_server, retry_delay)
-        return
+    local nodes = core.json.decode(value)
+    if not metadata then
+        return nodes
     end
-    local health_error_info = (health_err ~= nil and health_err)
-            or ((health_res ~= nil and health_res.status ~= 200)
-            and health_res.status)
-    if health_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_health_url,
-            ", got health result: ", json_delay_encode(health_res),
-            ", with error: ", health_error_info)
-
-        retry_delay = get_retry_delay(retry_delay)
-        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
-        core_sleep(retry_delay)
 
-        check_keepalive(consul_server, retry_delay)
-        return
+    local res = {}
+    for _, node in ipairs(nodes) do
+        if match_metadata(node.metadata, metadata) then
+            core.table.insert(res, node)
+        end
     end
+    return res
+end
 
-    log.info("connect consul: ", consul_server.consul_server_url,
-        ", catalog_result status: ", catalog_res.status,
-        ", catalog_result.headers.index: ", 
catalog_res.headers['X-Consul-Index'],
-        ", consul_server.index: ", consul_server.index,
-        ", consul_server: ", json_delay_encode(consul_server))
-
-    -- if the current index is different from the last index, then update the 
service
-    if (consul_server.catalog_index ~= 
tonumber(catalog_res.headers['X-Consul-Index']))
-            or (consul_server.health_index ~= 
tonumber(health_res.headers['X-Consul-Index'])) then
-        local up_services = core.table.new(0, #catalog_res.body)
-        for service_name, _ in pairs(catalog_res.body) do
-            -- check if the service_name is 'skip service'
-            if skip_service_map[service_name] then
-                goto CONTINUE
-            end
 
-            -- get node from service
-            local svc_url = consul_server.consul_sub_url .. "/" .. service_name
-            local svc_success, result, get_err = pcall(function()
-                return consul_client:get(svc_url, {passing = true})
-            end)
-            local error_info = (get_err ~= nil and get_err) or
-                    ((result ~= nil and result.status ~= 200) and 
result.status)
-            if not svc_success or error_info then
-                log.error("connect consul: ", consul_server.consul_server_url,
-                    ", by service url: ", svc_url, ", with error: ", 
error_info)
-                goto CONTINUE
-            end
+-- ─── Standard discovery interface ─────────────────────────────────────
 
-            -- decode body, decode json, update service, error handling
-            -- check result body is not nil and not empty
-            if is_not_empty(result.body) then
-                -- add services to table
-                local nodes = up_services[service_name]
-                local nodes_uniq = {}
-                for _, node in ipairs(result.body) do
-                    if not node.Service then
-                        goto CONTINUE
-                    end
-
-                    local svc_address, svc_port = node.Service.Address, 
node.Service.Port
-                    -- Handle nil or 0 port case - default to 80 for HTTP 
services
-                    if not svc_port or svc_port == 0 then
-                        svc_port = 80
-                    end
-                    -- if nodes is nil, new nodes table and set to up_services
-                    if not nodes then
-                        nodes = core.table.new(1, 0)
-                        up_services[service_name] = nodes
-                    end
-                    -- not store duplicate service IDs.
-                    local service_id = svc_address .. ":" .. svc_port
-                    if not nodes_uniq[service_id] then
-                        -- add node to nodes table
-                        core.table.insert(nodes, {
-                            host = svc_address,
-                            port = tonumber(svc_port),
-                            weight = default_weight,
-                        })
-                        nodes_uniq[service_id] = true
-                    end
-                end
-                if nodes then
-                    if sort_type == "port_sort" then
-                        core.table.sort(nodes, port_sort_nodes_cmp)
+local function fetch_node_from_shdict(service_name)
+    local dict = get_dict()
+    if not dict then
+        return nil, "consul shared dict not available"
+    end
 
-                    elseif sort_type == "host_sort" then
-                        core.table.sort(nodes, host_sort_nodes_cmp)
+    -- look up with the default registry prefix
+    local value = dict:get("default/" .. service_name)
+    if not value then
+        return nil, "consul service not found: " .. service_name
+    end
 
-                    elseif sort_type == "combine_sort" then
-                        core.table.sort(nodes, combine_sort_nodes_cmp)
+    local nodes, err = core.json.decode(value)
+    if not nodes then
+        return nil, "failed to decode nodes for service: "
+                    .. service_name .. ", error: " .. (err or "")
+    end
 
-                    end
-                end
-                up_services[service_name] = nodes
-            end
-            :: CONTINUE ::
-        end
+    return nodes
+end
 
-        update_all_services(consul_server.consul_server_url, up_services)
 
-        if dump_params then
-            ngx_timer_at(0, write_dump_services)
-        end
+function _M.nodes(service_name)
+    local default_reg = registries["default"]
+    local default_svc = default_reg and default_reg.default_service
 
-        update_index(consul_server,
-                catalog_res.headers['X-Consul-Index'],
-                health_res.headers['X-Consul-Index'])
+    local nodes, err = nodes_cache(service_name, nil,
+                                   fetch_node_from_shdict, service_name)
+    if not nodes then
+        log.error("fetch nodes failed by ", service_name, ", error: ", err)
+        return default_svc and {default_svc}
     end
 
-    check_keepalive(consul_server, retry_delay)
+    log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+        json_delay_encode(nodes, true))
+
+    return nodes
 end
 
 
-local function format_consul_params(consul_conf)
-    local consul_server_list = core.table.new(0, #consul_conf.servers)
+function _M.all_nodes()
+    local dict = get_dict()
+    if not dict then
+        return {}
+    end
+
+    local keys = dict:get_keys(0)
+    local services = core.table.new(0, #keys)
+    local prefix = "default/"
+    for i, key in ipairs(keys) do
+        -- only return default registry nodes, strip prefix for BC
+        if core.string.has_prefix(key, prefix) then
+            local value = dict:get(key)
+            if value then
+                local nodes, err = core.json.decode(value)
+                if nodes then
+                    local bare_key = key:sub(#prefix + 1)
+                    services[bare_key] = nodes
+                else
+                    log.error("failed to decode nodes for service: ", key, ", 
error: ", err)
+                end
+            end
+        end
 
-    for _, v in pairs(consul_conf.servers) do
-        local scheme, host, port, path = unpack(http.parse_uri(nil, v))
-        if scheme ~= "http" then
-            return nil, "only support consul http schema address, eg: 
http://address:port";
-        elseif path ~= "/" or core.string.has_suffix(v, '/') then
-            return nil, "invalid consul server address, the valid format: 
http://address:port";
+        if i % 100 == 0 then
+            ngx.sleep(0)
         end
-        core.table.insert(consul_server_list, {
-            host = host,
-            port = port,
-            token = consul_conf.token,
-            connect_timeout = consul_conf.timeout.connect,
-            read_timeout = consul_conf.timeout.read,
-            wait_timeout = consul_conf.timeout.wait,
-            consul_watch_catalog_url = "/catalog/services",
-            consul_sub_url = "/health/service",
-            consul_watch_health_url = "/health/state/any",
-            consul_server_url = v .. "/v1",
-            weight = consul_conf.weight,
-            keepalive = consul_conf.keepalive,
-            health_index = 0,
-            catalog_index = 0,
-            fetch_interval = consul_conf.fetch_interval -- fetch interval to 
next connect consul
-        })
     end
-    return consul_server_list, nil
+    return services
 end
 
 
+-- ─── Initialization ───────────────────────────────────────────────────
+
 function _M.init_worker()
-    local consul_conf = local_conf.discovery.consul
-    dump_params = consul_conf.dump
+    local consul_conf = local_conf.discovery and local_conf.discovery.consul
+    if not consul_conf then
+        return
+    end
 
-    default_weight = consul_conf.weight
-    sort_type = consul_conf.sort_type
-    -- set default service, used when the server node cannot be found
-    if consul_conf.default_service then
-        default_service = consul_conf.default_service
-        default_service.weight = default_weight
+    local dict = ngx.shared[dict_name]
+    if not dict then
+        error('lua_shared_dict "' .. dict_name .. '" not configured')
     end
+    consul_dict = dict
 
     if process.type() ~= "privileged agent" then
         return
     end
 
-    -- flush stale data that may persist across reloads,
-    -- since consul_services is re-initialized empty
-    consul_dict:flush_all()
-
-    if consul_conf.dump then
-        if consul_conf.dump.load_on_init then
-            read_dump_services()
-        end
-    end
+    -- flush stale data that may persist across reloads
+    dict:flush_all()
 
     log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
 
-    if consul_conf.skip_services then
-        skip_service_map = core.table.new(0, #consul_conf.skip_services)
-        for _, v in ipairs(consul_conf.skip_services) do
-            skip_service_map[v] = true
-        end
-    end
-    -- set up default skip service
-    for _, v in ipairs(default_skip_services) do
-        skip_service_map[v] = true
-    end
-
-    local consul_servers_list, err = format_consul_params(consul_conf)
-    if err then
-        error("format consul config got error: " .. err)
+    -- shallow copy to avoid mutating cached config
+    local conf = {}
+    for k, v in pairs(consul_conf) do
+        conf[k] = v
     end
-    log.info("consul_server_list: ", json_delay_encode(consul_servers_list, 
true))
-
-    consul_services = core.table.new(0, 1)
-    -- success or failure
-    for _, server in ipairs(consul_servers_list) do
-        local ok, err = ngx_timer_at(0, _M.connect, server)
-        if not ok then
-            error("create consul got error: " .. err)
-        end
+    conf.id = "default"
 
-        if server.keepalive == false then
-            ngx_timer_every(server.fetch_interval, _M.connect, server)
-        end
-    end
+    local reg = _M.create_registry(conf)
+    _M.start_registry(reg)

Review Comment:
   Fixed — default registry is now created on all workers, only start_registry 
runs on privileged agent.



##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
 end
 
 
-local function get_retry_delay(retry_delay)
-    if not retry_delay or retry_delay >= max_retry_time then
-        retry_delay = 1
-    else
-        retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+    if reg.stop_flag then
+        return
     end
 
-    return retry_delay
+    if consul_server.keepalive and not exiting() then
+        local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server, 
retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
 end
 
 
-local function get_opts(consul_server, is_catalog)
-    local opts = {
-        host = consul_server.host,
-        port = consul_server.port,
-        connect_timeout = consul_server.connect_timeout,
-        read_timeout = consul_server.read_timeout,
-        default_args = {
-            token = consul_server.token,
-        }
-    }
-    if not consul_server.keepalive then
-        return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+    if premature or reg.stop_flag then
+        return
     end
 
-    opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0; 
unblocked by wait=0
+    local catalog_thread, spawn_catalog_err = 
thread_spawn(consul_client.watch_catalog,
+                                                           consul_server)
+    if not catalog_thread then
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
 
-    if is_catalog then
-        opts.default_args.index = consul_server.catalog_index
-    else
-        opts.default_args.index = consul_server.health_index
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return opts
-end
-
-
-local function watch_catalog(consul_server)
-    local client = resty_consul:new(get_opts(consul_server, true))
-
-    ::RETRY::
-    local watch_result, watch_err = 
client:get(consul_server.consul_watch_catalog_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-                             or ((watch_result ~= nil and watch_result.status 
~= 200)
-                             and watch_result.status)
-    if watch_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_catalog_url,
-            ", got watch result: ", json_delay_encode(watch_result),
-            ", with error: ", watch_error_info)
+    local health_thread, err = thread_spawn(consul_client.watch_health, 
consul_server)
+    if not health_thread then
+        thread_kill(catalog_thread)
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch health: ", err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
 
-        return watch_type_catalog, default_catalog_error_index
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    if consul_server.catalog_index > 0
-            and consul_server.catalog_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
+    local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, 
health_thread)
+    thread_kill(catalog_thread)
+    thread_kill(health_thread)
+    if not thread_wait_ok then
         local random_delay = math_random(default_random_range)
-        log.info("watch catalog has no change, re-watch consul after ", 
random_delay, " seconds")
+        log.error("failed to wait thread: ", watch_type, ", retry connecting 
consul after ",
+                random_delay, " seconds")
         core_sleep(random_delay)
-        goto RETRY
+
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+    if not consul_client.watch_result_is_valid(tonumber(watch_type),
+            tonumber(index), consul_server.catalog_index, 
consul_server.health_index) then
+        retry_delay = consul_client.get_retry_delay(retry_delay)
+        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
+        core_sleep(retry_delay)
 
+        check_keepalive(reg, consul_server, retry_delay)
+        return
+    end
 
-local function watch_health(consul_server)
-    local client = resty_consul:new(get_opts(consul_server, false))
+    if reg.stop_flag then
+        return
+    end
 
-    ::RETRY::
-    local watch_result, watch_err = 
client:get(consul_server.consul_watch_health_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-            or ((watch_result ~= nil and watch_result.status ~= 200)
-            and watch_result.status)
-    if watch_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_health_url,
-            ", got watch result: ", json_delay_encode(watch_result),
-            ", with error: ", watch_error_info)
+    local up_services, fetch_err, new_catalog_index, new_health_index =
+        consul_client.fetch_services_from_server(consul_server, {
+            default_weight    = reg.conf.weight,
+            sort_type         = reg.conf.sort_type,
+            skip_service_map  = reg.skip_service_map,
+            preserve_metadata = reg.preserve_metadata,
+            key_builder       = reg.key_builder,
+        })
 
-        return watch_type_health, default_health_error_index
-    end
+    if fetch_err then
+        retry_delay = consul_client.get_retry_delay(retry_delay)
+        log.warn("get all svcs got err: ", fetch_err,
+                 ", retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
 
-    if consul_server.health_index > 0
-            and consul_server.health_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
-        local random_delay = math_random(default_random_range)
-        log.info("watch health has no change, re-watch consul after ", 
random_delay, " seconds")
-        core_sleep(random_delay)
-        goto RETRY
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+    if reg.stop_flag then
+        return
+    end
 
+    -- only update if there are actual services (empty table means no index 
change)
+    if next(up_services) then
+        update_all_services(reg, consul_server.consul_server_url, up_services)
 
-local function check_keepalive(consul_server, retry_delay)
-    if consul_server.keepalive and not exiting() then
-        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
-        if not ok then
-            log.error("create ngx_timer_at got error: ", err)
-            return
+        if reg.dump_params then
+            ngx_timer_at(0, write_dump_services, reg)
         end
     end
+
+    consul_client.update_index(consul_server, new_catalog_index, 
new_health_index)
+    check_keepalive(reg, consul_server, retry_delay)
 end
 
 
-local function update_index(consul_server, catalog_index, health_index)
-    local c_index = 0
-    local h_index = 0
-    if catalog_index ~= nil then
-        c_index = tonumber(catalog_index)
-    end
+-- ─── Registry management API ──────────────────────────────────────────
 
-    if health_index ~= nil then
-        h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+---              keepalive, fetch_interval, sort_type, skip_services,
+---              dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+---          key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+    options = options or {}
+    local id = conf.id
+    if not id or id == "" then
+        return nil, "registry id is required"
     end
 
-    if c_index > 0 then
-        consul_server.catalog_index = c_index
+    if registries[id] then
+        _M.stop_registry(id)
     end
 
-    if h_index > 0 then
-        consul_server.health_index = h_index
+    -- build skip service map
+    local skip_map = core.table.new(0, 1)
+    if conf.skip_services then
+        for _, v in ipairs(conf.skip_services) do
+            skip_map[v] = true
+        end
     end
+    for _, v in ipairs(default_skip_services) do
+        skip_map[v] = true
+    end
+
+    -- build default_service
+    local default_svc
+    if conf.default_service then
+        default_svc = conf.default_service
+        default_svc.weight = conf.weight
+    end
+
+    local reg = {
+        id                = id,
+        conf              = conf,
+        stop_flag         = false,
+        preserve_metadata = options.preserve_metadata or false,
+        key_builder       = options.key_builder or default_key_builder(id),
+        service_scanner   = options.service_scanner or 
consul_client.get_consul_services,
+        skip_service_map  = skip_map,
+        default_service   = default_svc,
+        dump_params       = conf.dump,
+        consul_services   = core.table.new(0, 1),
+    }
+
+    registries[id] = reg
+    return reg
 end
 
 
-local function is_not_empty(value)
-    if value == nil or value == null
-            or (type(value) == "table" and not next(value))
-            or (type(value) == "string" and value == "")
-    then
-        return false
+function _M.start_registry(reg)
+    local dict = get_dict()
+    if not dict then
+        error('lua_shared_dict "' .. dict_name .. '" not configured')
     end
 
-    return true
-end
+    -- flush stale data for this registry
+    local prefix = reg.id .. "/"
+    local all_keys = dict:get_keys(0)
+    for _, key in ipairs(all_keys) do
+        if core.string.has_prefix(key, prefix) then
+            dict:delete(key)
+        end
+    end
 
+    -- load dump file if configured
+    if reg.dump_params and reg.dump_params.load_on_init then
+        read_dump_services(reg)
+    end
 
-local function watch_result_is_valid(watch_type, index, catalog_index, 
health_index)
-    if index <= 0 then
-        return false
+    local consul_servers_list, err = 
consul_client.format_consul_params(reg.conf)
+    if err then
+        error("format consul config got error: " .. err)
     end
+    log.info("consul_server_list: ", json_delay_encode(consul_servers_list, 
true))
 
-    if watch_type == watch_type_catalog then
-        if index == catalog_index then
-            return false
+    for _, server in ipairs(consul_servers_list) do
+        local ok, timer_err = ngx_timer_at(0, _M.connect, reg, server)
+        if not ok then
+            error("create consul got error: " .. timer_err)
         end
-    else
-        if index == health_index then
-            return false
+
+        if server.keepalive == false then
+            ngx_timer_every(server.fetch_interval, _M.connect, reg, server)
         end
     end
-
-    return true
 end
 
 
-local function combine_sort_nodes_cmp(left, right)
-    if left.host ~= right.host then
-        return left.host < right.host
+function _M.stop_registry(id)
+    local reg = registries[id]
+    if not reg then
+        return
     end
 
-    return left.port < right.port
-end
-
+    reg.stop_flag = true
+    registries[id] = nil
 
-local function port_sort_nodes_cmp(left, right)
-    return left.port < right.port
+    local dict = get_dict()
+    if dict then
+        local prefix = id .. "/"
+        local all_keys = dict:get_keys(0)
+        for _, key in ipairs(all_keys) do
+            if core.string.has_prefix(key, prefix) then
+                dict:delete(key)
+            end
+        end
+    end
 end
 
 
-local function host_sort_nodes_cmp(left, right)
-    return left.host < right.host
+function _M.get_registry(id)
+    return registries[id]
 end
 
 
-function _M.connect(premature, consul_server, retry_delay)
-    if premature then
-        return
-    end
-
-    local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, 
consul_server)
-    if not catalog_thread then
-        local random_delay = math_random(default_random_range)
-        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
-            ", retry connecting consul after ", random_delay, " seconds")
-        core_sleep(random_delay)
+-- ─── Shared helpers ──────────────────────────────────────────────────
 
-        check_keepalive(consul_server, retry_delay)
-        return
+local function match_metadata(node_metadata, upstream_metadata)
+    if upstream_metadata == nil then
+        return true
     end
 
-    local health_thread, err = thread_spawn(watch_health, consul_server)
-    if not health_thread then
-        thread_kill(catalog_thread)
-        local random_delay = math_random(default_random_range)
-        log.error("failed to spawn thread watch health: ", err, ", retry 
connecting consul after ",
-            random_delay, " seconds")
-        core_sleep(random_delay)
-
-        check_keepalive(consul_server, retry_delay)
-        return
+    if not node_metadata then
+        node_metadata = {}
     end
 
-    local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, 
health_thread)
-    thread_kill(catalog_thread)
-    thread_kill(health_thread)
-    if not thread_wait_ok then
-        local random_delay = math_random(default_random_range)
-        log.error("failed to wait thread: ", watch_type, ", retry connecting 
consul after ",
-                random_delay, " seconds")
-        core_sleep(random_delay)
-
-        check_keepalive(consul_server, retry_delay)
-        return
+    for k, v in pairs(upstream_metadata) do
+        if not node_metadata[k] or node_metadata[k] ~= v then
+            return false
+        end
     end
 
-    -- double check index has changed
-    if not watch_result_is_valid(tonumber(watch_type),
-            tonumber(index), consul_server.catalog_index, 
consul_server.health_index) then
-        retry_delay = get_retry_delay(retry_delay)
-        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
-        core_sleep(retry_delay)
+    return true
+end
 
-        check_keepalive(consul_server, retry_delay)
-        return
-    end
 
-    local consul_client = resty_consul:new({
-        host = consul_server.host,
-        port = consul_server.port,
-        connect_timeout = consul_server.connect_timeout,
-        read_timeout = consul_server.read_timeout,
-        default_args = {
-            token = consul_server.token
-        }
-    })
-    local catalog_success, catalog_res, catalog_err = pcall(function()
-        return consul_client:get(consul_server.consul_watch_catalog_url)
-    end)
-    if not catalog_success then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_catalog_url,
-            ", got catalog result: ", json_delay_encode(catalog_res))
-        check_keepalive(consul_server, retry_delay)
-        return
+function _M.get_nodes(key, metadata)
+    local dict = get_dict()
+    if not dict then
+        return nil
     end
-    local catalog_error_info = (catalog_err ~= nil and catalog_err)
-            or ((catalog_res ~= nil and catalog_res.status ~= 200)
-            and catalog_res.status)
-    if catalog_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_catalog_url,
-            ", got catalog result: ", json_delay_encode(catalog_res),
-            ", with error: ", catalog_error_info)
-
-        retry_delay = get_retry_delay(retry_delay)
-        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
-        core_sleep(retry_delay)
 
-        check_keepalive(consul_server, retry_delay)
-        return
+    local value = dict:get(key)
+    if not value then
+        return nil
     end
 
-    -- get health index
-    local success, health_res, health_err = pcall(function()
-        return consul_client:get(consul_server.consul_watch_health_url)
-    end)
-    if not success then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_health_url,
-            ", got health result: ", json_delay_encode(health_res))
-        check_keepalive(consul_server, retry_delay)
-        return
+    local nodes = core.json.decode(value)
+    if not metadata then
+        return nodes
     end
-    local health_error_info = (health_err ~= nil and health_err)
-            or ((health_res ~= nil and health_res.status ~= 200)
-            and health_res.status)
-    if health_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_health_url,
-            ", got health result: ", json_delay_encode(health_res),
-            ", with error: ", health_error_info)
-
-        retry_delay = get_retry_delay(retry_delay)
-        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
-        core_sleep(retry_delay)
 
-        check_keepalive(consul_server, retry_delay)
-        return
+    local res = {}
+    for _, node in ipairs(nodes) do
+        if match_metadata(node.metadata, metadata) then
+            core.table.insert(res, node)
+        end
     end
+    return res
+end
 
-    log.info("connect consul: ", consul_server.consul_server_url,
-        ", catalog_result status: ", catalog_res.status,
-        ", catalog_result.headers.index: ", 
catalog_res.headers['X-Consul-Index'],
-        ", consul_server.index: ", consul_server.index,
-        ", consul_server: ", json_delay_encode(consul_server))
-
-    -- if the current index is different from the last index, then update the 
service
-    if (consul_server.catalog_index ~= 
tonumber(catalog_res.headers['X-Consul-Index']))
-            or (consul_server.health_index ~= 
tonumber(health_res.headers['X-Consul-Index'])) then
-        local up_services = core.table.new(0, #catalog_res.body)
-        for service_name, _ in pairs(catalog_res.body) do
-            -- check if the service_name is 'skip service'
-            if skip_service_map[service_name] then
-                goto CONTINUE
-            end
 
-            -- get node from service
-            local svc_url = consul_server.consul_sub_url .. "/" .. service_name
-            local svc_success, result, get_err = pcall(function()
-                return consul_client:get(svc_url, {passing = true})
-            end)
-            local error_info = (get_err ~= nil and get_err) or
-                    ((result ~= nil and result.status ~= 200) and 
result.status)
-            if not svc_success or error_info then
-                log.error("connect consul: ", consul_server.consul_server_url,
-                    ", by service url: ", svc_url, ", with error: ", 
error_info)
-                goto CONTINUE
-            end
+-- ─── Standard discovery interface ─────────────────────────────────────
 
-            -- decode body, decode json, update service, error handling
-            -- check result body is not nil and not empty
-            if is_not_empty(result.body) then
-                -- add services to table
-                local nodes = up_services[service_name]
-                local nodes_uniq = {}
-                for _, node in ipairs(result.body) do
-                    if not node.Service then
-                        goto CONTINUE
-                    end
-
-                    local svc_address, svc_port = node.Service.Address, 
node.Service.Port
-                    -- Handle nil or 0 port case - default to 80 for HTTP 
services
-                    if not svc_port or svc_port == 0 then
-                        svc_port = 80
-                    end
-                    -- if nodes is nil, new nodes table and set to up_services
-                    if not nodes then
-                        nodes = core.table.new(1, 0)
-                        up_services[service_name] = nodes
-                    end
-                    -- not store duplicate service IDs.
-                    local service_id = svc_address .. ":" .. svc_port
-                    if not nodes_uniq[service_id] then
-                        -- add node to nodes table
-                        core.table.insert(nodes, {
-                            host = svc_address,
-                            port = tonumber(svc_port),
-                            weight = default_weight,
-                        })
-                        nodes_uniq[service_id] = true
-                    end
-                end
-                if nodes then
-                    if sort_type == "port_sort" then
-                        core.table.sort(nodes, port_sort_nodes_cmp)
+local function fetch_node_from_shdict(service_name)
+    local dict = get_dict()
+    if not dict then
+        return nil, "consul shared dict not available"
+    end
 
-                    elseif sort_type == "host_sort" then
-                        core.table.sort(nodes, host_sort_nodes_cmp)
+    -- look up with the default registry prefix
+    local value = dict:get("default/" .. service_name)
+    if not value then
+        return nil, "consul service not found: " .. service_name
+    end
 
-                    elseif sort_type == "combine_sort" then
-                        core.table.sort(nodes, combine_sort_nodes_cmp)
+    local nodes, err = core.json.decode(value)
+    if not nodes then
+        return nil, "failed to decode nodes for service: "
+                    .. service_name .. ", error: " .. (err or "")
+    end
 
-                    end
-                end
-                up_services[service_name] = nodes
-            end
-            :: CONTINUE ::
-        end
+    return nodes
+end
 
-        update_all_services(consul_server.consul_server_url, up_services)
 
-        if dump_params then
-            ngx_timer_at(0, write_dump_services)
-        end
+function _M.nodes(service_name)
+    local default_reg = registries["default"]
+    local default_svc = default_reg and default_reg.default_service
 
-        update_index(consul_server,
-                catalog_res.headers['X-Consul-Index'],
-                health_res.headers['X-Consul-Index'])
+    local nodes, err = nodes_cache(service_name, nil,
+                                   fetch_node_from_shdict, service_name)
+    if not nodes then
+        log.error("fetch nodes failed by ", service_name, ", error: ", err)
+        return default_svc and {default_svc}
     end
 
-    check_keepalive(consul_server, retry_delay)
+    log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ",
+        json_delay_encode(nodes, true))
+
+    return nodes
 end
 
 
-local function format_consul_params(consul_conf)
-    local consul_server_list = core.table.new(0, #consul_conf.servers)
+function _M.all_nodes()
+    local dict = get_dict()
+    if not dict then
+        return {}
+    end
+
+    local keys = dict:get_keys(0)
+    local services = core.table.new(0, #keys)
+    local prefix = "default/"
+    for i, key in ipairs(keys) do
+        -- only return default registry nodes, strip prefix for BC
+        if core.string.has_prefix(key, prefix) then
+            local value = dict:get(key)
+            if value then
+                local nodes, err = core.json.decode(value)
+                if nodes then
+                    local bare_key = key:sub(#prefix + 1)
+                    services[bare_key] = nodes
+                else
+                    log.error("failed to decode nodes for service: ", key, ", 
error: ", err)
+                end
+            end
+        end
 
-    for _, v in pairs(consul_conf.servers) do
-        local scheme, host, port, path = unpack(http.parse_uri(nil, v))
-        if scheme ~= "http" then
-            return nil, "only support consul http schema address, eg: 
http://address:port";
-        elseif path ~= "/" or core.string.has_suffix(v, '/') then
-            return nil, "invalid consul server address, the valid format: 
http://address:port";
+        if i % 100 == 0 then
+            ngx.sleep(0)
         end
-        core.table.insert(consul_server_list, {
-            host = host,
-            port = port,
-            token = consul_conf.token,
-            connect_timeout = consul_conf.timeout.connect,
-            read_timeout = consul_conf.timeout.read,
-            wait_timeout = consul_conf.timeout.wait,
-            consul_watch_catalog_url = "/catalog/services",
-            consul_sub_url = "/health/service",
-            consul_watch_health_url = "/health/state/any",
-            consul_server_url = v .. "/v1",
-            weight = consul_conf.weight,
-            keepalive = consul_conf.keepalive,
-            health_index = 0,
-            catalog_index = 0,
-            fetch_interval = consul_conf.fetch_interval -- fetch interval to 
next connect consul
-        })
     end
-    return consul_server_list, nil
+    return services
 end
 
 
+-- ─── Initialization ───────────────────────────────────────────────────
+
 function _M.init_worker()
-    local consul_conf = local_conf.discovery.consul
-    dump_params = consul_conf.dump
+    local consul_conf = local_conf.discovery and local_conf.discovery.consul
+    if not consul_conf then
+        return
+    end
 
-    default_weight = consul_conf.weight
-    sort_type = consul_conf.sort_type
-    -- set default service, used when the server node cannot be found
-    if consul_conf.default_service then
-        default_service = consul_conf.default_service
-        default_service.weight = default_weight
+    local dict = ngx.shared[dict_name]
+    if not dict then
+        error('lua_shared_dict "' .. dict_name .. '" not configured')
     end
+    consul_dict = dict
 
     if process.type() ~= "privileged agent" then
         return
     end
 
-    -- flush stale data that may persist across reloads,
-    -- since consul_services is re-initialized empty
-    consul_dict:flush_all()
-
-    if consul_conf.dump then
-        if consul_conf.dump.load_on_init then
-            read_dump_services()
-        end
-    end
+    -- flush stale data that may persist across reloads
+    dict:flush_all()
 
     log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
 
-    if consul_conf.skip_services then
-        skip_service_map = core.table.new(0, #consul_conf.skip_services)
-        for _, v in ipairs(consul_conf.skip_services) do
-            skip_service_map[v] = true
-        end
-    end
-    -- set up default skip service
-    for _, v in ipairs(default_skip_services) do
-        skip_service_map[v] = true
-    end
-
-    local consul_servers_list, err = format_consul_params(consul_conf)
-    if err then
-        error("format consul config got error: " .. err)
+    -- shallow copy to avoid mutating cached config
+    local conf = {}
+    for k, v in pairs(consul_conf) do
+        conf[k] = v
     end
-    log.info("consul_server_list: ", json_delay_encode(consul_servers_list, 
true))
-
-    consul_services = core.table.new(0, 1)
-    -- success or failure
-    for _, server in ipairs(consul_servers_list) do
-        local ok, err = ngx_timer_at(0, _M.connect, server)
-        if not ok then
-            error("create consul got error: " .. err)
-        end
+    conf.id = "default"
 
-        if server.keepalive == false then
-            ngx_timer_every(server.fetch_interval, _M.connect, server)
-        end
-    end
+    local reg = _M.create_registry(conf)
+    _M.start_registry(reg)
 end
 
 
 function _M.dump_data()
-    return {config = local_conf.discovery.consul, services = _M.all_nodes()}
+    return {config = local_conf.discovery and local_conf.discovery.consul,
+            services = _M.all_nodes()}
 end
 
 
 function _M.control_api()
+    local default_reg = registries["default"]
     return {
         {
             methods = {"GET"},
             uris = {"/show_dump_file"},
-            handler = show_dump_file,
+            handler = function()
+                return show_dump_file(default_reg)
+            end,

Review Comment:
   Fixed — default_reg is now resolved inside the handler closure.



##########
apisix/discovery/consul/init.lua:
##########
@@ -252,496 +223,410 @@ local function show_dump_file()
 end
 
 
-local function get_retry_delay(retry_delay)
-    if not retry_delay or retry_delay >= max_retry_time then
-        retry_delay = 1
-    else
-        retry_delay = retry_delay * 4
+-- ─── polling loop ─────────────────────────────────────────────────────
+
+local function check_keepalive(reg, consul_server, retry_delay)
+    if reg.stop_flag then
+        return
     end
 
-    return retry_delay
+    if consul_server.keepalive and not exiting() then
+        local ok, err = ngx_timer_at(0, _M.connect, reg, consul_server, 
retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
 end
 
 
-local function get_opts(consul_server, is_catalog)
-    local opts = {
-        host = consul_server.host,
-        port = consul_server.port,
-        connect_timeout = consul_server.connect_timeout,
-        read_timeout = consul_server.read_timeout,
-        default_args = {
-            token = consul_server.token,
-        }
-    }
-    if not consul_server.keepalive then
-        return opts
+function _M.connect(premature, reg, consul_server, retry_delay)
+    if premature or reg.stop_flag then
+        return
     end
 
-    opts.default_args.wait = consul_server.wait_timeout --blocked wait!=0; 
unblocked by wait=0
+    local catalog_thread, spawn_catalog_err = 
thread_spawn(consul_client.watch_catalog,
+                                                           consul_server)
+    if not catalog_thread then
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
 
-    if is_catalog then
-        opts.default_args.index = consul_server.catalog_index
-    else
-        opts.default_args.index = consul_server.health_index
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return opts
-end
-
-
-local function watch_catalog(consul_server)
-    local client = resty_consul:new(get_opts(consul_server, true))
-
-    ::RETRY::
-    local watch_result, watch_err = 
client:get(consul_server.consul_watch_catalog_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-                             or ((watch_result ~= nil and watch_result.status 
~= 200)
-                             and watch_result.status)
-    if watch_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_catalog_url,
-            ", got watch result: ", json_delay_encode(watch_result),
-            ", with error: ", watch_error_info)
+    local health_thread, err = thread_spawn(consul_client.watch_health, 
consul_server)
+    if not health_thread then
+        thread_kill(catalog_thread)
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch health: ", err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
 
-        return watch_type_catalog, default_catalog_error_index
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    if consul_server.catalog_index > 0
-            and consul_server.catalog_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
+    local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, 
health_thread)
+    thread_kill(catalog_thread)
+    thread_kill(health_thread)
+    if not thread_wait_ok then
         local random_delay = math_random(default_random_range)
-        log.info("watch catalog has no change, re-watch consul after ", 
random_delay, " seconds")
+        log.error("failed to wait thread: ", watch_type, ", retry connecting 
consul after ",
+                random_delay, " seconds")
         core_sleep(random_delay)
-        goto RETRY
+
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return watch_type_catalog, watch_result.headers['X-Consul-Index']
-end
+    if not consul_client.watch_result_is_valid(tonumber(watch_type),
+            tonumber(index), consul_server.catalog_index, 
consul_server.health_index) then
+        retry_delay = consul_client.get_retry_delay(retry_delay)
+        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
+        core_sleep(retry_delay)
 
+        check_keepalive(reg, consul_server, retry_delay)
+        return
+    end
 
-local function watch_health(consul_server)
-    local client = resty_consul:new(get_opts(consul_server, false))
+    if reg.stop_flag then
+        return
+    end
 
-    ::RETRY::
-    local watch_result, watch_err = 
client:get(consul_server.consul_watch_health_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-            or ((watch_result ~= nil and watch_result.status ~= 200)
-            and watch_result.status)
-    if watch_error_info then
-        log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_health_url,
-            ", got watch result: ", json_delay_encode(watch_result),
-            ", with error: ", watch_error_info)
+    local up_services, fetch_err, new_catalog_index, new_health_index =
+        consul_client.fetch_services_from_server(consul_server, {
+            default_weight    = reg.conf.weight,
+            sort_type         = reg.conf.sort_type,
+            skip_service_map  = reg.skip_service_map,
+            preserve_metadata = reg.preserve_metadata,
+            key_builder       = reg.key_builder,
+        })
 
-        return watch_type_health, default_health_error_index
-    end
+    if fetch_err then
+        retry_delay = consul_client.get_retry_delay(retry_delay)
+        log.warn("get all svcs got err: ", fetch_err,
+                 ", retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
 
-    if consul_server.health_index > 0
-            and consul_server.health_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
-        local random_delay = math_random(default_random_range)
-        log.info("watch health has no change, re-watch consul after ", 
random_delay, " seconds")
-        core_sleep(random_delay)
-        goto RETRY
+        check_keepalive(reg, consul_server, retry_delay)
+        return
     end
 
-    return watch_type_health, watch_result.headers['X-Consul-Index']
-end
+    if reg.stop_flag then
+        return
+    end
 
+    -- only update if there are actual services (empty table means no index 
change)
+    if next(up_services) then
+        update_all_services(reg, consul_server.consul_server_url, up_services)
 
-local function check_keepalive(consul_server, retry_delay)
-    if consul_server.keepalive and not exiting() then
-        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
-        if not ok then
-            log.error("create ngx_timer_at got error: ", err)
-            return
+        if reg.dump_params then
+            ngx_timer_at(0, write_dump_services, reg)
         end
     end
+
+    consul_client.update_index(consul_server, new_catalog_index, 
new_health_index)
+    check_keepalive(reg, consul_server, retry_delay)
 end
 
 
-local function update_index(consul_server, catalog_index, health_index)
-    local c_index = 0
-    local h_index = 0
-    if catalog_index ~= nil then
-        c_index = tonumber(catalog_index)
-    end
+-- ─── Registry management API ──────────────────────────────────────────
 
-    if health_index ~= nil then
-        h_index = tonumber(health_index)
+--- Create a consul registry instance.
+---
+--- conf fields: id, servers (array of URLs), token, timeout, weight,
+---              keepalive, fetch_interval, sort_type, skip_services,
+---              dump, default_service, shared_size
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+---          key_builder (function(service_name) -> string)
+function _M.create_registry(conf, options)
+    options = options or {}
+    local id = conf.id
+    if not id or id == "" then
+        return nil, "registry id is required"
     end
 
-    if c_index > 0 then
-        consul_server.catalog_index = c_index
+    if registries[id] then
+        _M.stop_registry(id)
     end
 
-    if h_index > 0 then
-        consul_server.health_index = h_index
+    -- build skip service map
+    local skip_map = core.table.new(0, 1)
+    if conf.skip_services then
+        for _, v in ipairs(conf.skip_services) do
+            skip_map[v] = true
+        end
     end
+    for _, v in ipairs(default_skip_services) do
+        skip_map[v] = true
+    end
+
+    -- build default_service
+    local default_svc
+    if conf.default_service then
+        default_svc = conf.default_service
+        default_svc.weight = conf.weight
+    end
+
+    local reg = {
+        id                = id,
+        conf              = conf,
+        stop_flag         = false,
+        preserve_metadata = options.preserve_metadata or false,
+        key_builder       = options.key_builder or default_key_builder(id),
+        service_scanner   = options.service_scanner or 
consul_client.get_consul_services,
+        skip_service_map  = skip_map,
+        default_service   = default_svc,
+        dump_params       = conf.dump,
+        consul_services   = core.table.new(0, 1),
+    }
+
+    registries[id] = reg
+    return reg
 end
 
 
-local function is_not_empty(value)
-    if value == nil or value == null
-            or (type(value) == "table" and not next(value))
-            or (type(value) == "string" and value == "")
-    then
-        return false
+function _M.start_registry(reg)
+    local dict = get_dict()
+    if not dict then
+        error('lua_shared_dict "' .. dict_name .. '" not configured')
     end
 
-    return true
-end
+    -- flush stale data for this registry
+    local prefix = reg.id .. "/"
+    local all_keys = dict:get_keys(0)
+    for _, key in ipairs(all_keys) do
+        if core.string.has_prefix(key, prefix) then
+            dict:delete(key)
+        end
+    end
 
+    -- load dump file if configured
+    if reg.dump_params and reg.dump_params.load_on_init then
+        read_dump_services(reg)
+    end
 
-local function watch_result_is_valid(watch_type, index, catalog_index, 
health_index)
-    if index <= 0 then
-        return false
+    local consul_servers_list, err = 
consul_client.format_consul_params(reg.conf)
+    if err then
+        error("format consul config got error: " .. err)
     end
+    log.info("consul_server_list: ", json_delay_encode(consul_servers_list, 
true))
 
-    if watch_type == watch_type_catalog then
-        if index == catalog_index then
-            return false
+    for _, server in ipairs(consul_servers_list) do
+        local ok, timer_err = ngx_timer_at(0, _M.connect, reg, server)
+        if not ok then
+            error("create consul got error: " .. timer_err)
         end
-    else
-        if index == health_index then
-            return false
+
+        if server.keepalive == false then
+            ngx_timer_every(server.fetch_interval, _M.connect, reg, server)
         end

Review Comment:
   Fixed — replaced ngx.timer.every with self-rescheduling ngx.timer.at in 
check_keepalive, so stop_flag can halt future wakeups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to