This is an automated email from the ASF dual-hosted git repository.

nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5962e73e7 refactor: extract reusable building blocks from K8s and 
Nacos discovery (#13201)
5962e73e7 is described below

commit 5962e73e7cbef1387b061ba6892b8c98e258da45
Author: Nic <[email protected]>
AuthorDate: Tue Apr 14 12:29:35 2026 +0800

    refactor: extract reusable building blocks from K8s and Nacos discovery 
(#13201)
---
 apisix/discovery/kubernetes/core.lua             | 752 +++++++++++++++++++++++
 apisix/discovery/kubernetes/informer_factory.lua |  34 +-
 apisix/discovery/kubernetes/init.lua             | 679 +-------------------
 apisix/discovery/kubernetes/schema.lua           |   6 -
 apisix/discovery/nacos/client.lua                | 365 +++++++++++
 apisix/discovery/nacos/init.lua                  | 517 ++++++----------
 t/APISIX.pm                                      |   1 +
 t/discovery/nacos2.t                             |   4 +-
 8 files changed, 1381 insertions(+), 977 deletions(-)

diff --git a/apisix/discovery/kubernetes/core.lua 
b/apisix/discovery/kubernetes/core.lua
new file mode 100644
index 000000000..1ba74408d
--- /dev/null
+++ b/apisix/discovery/kubernetes/core.lua
@@ -0,0 +1,752 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Reusable building blocks for Kubernetes service discovery.
+--- Extracted from init.lua so that both static-config mode and
+--- dynamic-config mode can share the same core logic.
+
+local ngx          = ngx
+local ipairs        = ipairs
+local pairs         = pairs
+local unpack        = unpack
+local string        = string
+local tonumber      = tonumber
+local tostring      = tostring
+local os            = os
+local pcall         = pcall
+local setmetatable  = setmetatable
+
+local core = require("apisix.core")
+local util = require("apisix.cli.util")
+local default_informer_factory = 
require("apisix.discovery.kubernetes.informer_factory")
+
+
+local _M = {}
+
+local endpoint_buffer = {}
+local kubernetes_service_name_label = "kubernetes.io/service-name"
+
+
+-- ─── helpers ──────────────────────────────────────────────────────────
+
+local function sort_nodes_cmp(left, right)
+    if left.host ~= right.host then
+        return left.host < right.host
+    end
+    return left.port < right.port
+end
+
+
+local function build_endpoint_key(key_prefix, namespace, name)
+    if key_prefix and key_prefix ~= "" then
+        return key_prefix .. "/" .. namespace .. "/" .. name
+    end
+    return namespace .. "/" .. name
+end
+
+
+-- ─── config parsing (exported) ────────────────────────────────────────
+
+function _M.read_env(key)
+    if #key > 3 then
+        local first, second = string.byte(key, 1, 2)
+        if first == string.byte('$') and second == string.byte('{') then
+            local last = string.byte(key, #key)
+            if last == string.byte('}') then
+                local env = string.sub(key, 3, #key - 1)
+                local value = os.getenv(env)
+                if not value then
+                    return nil, "not found environment variable " .. env
+                end
+                return value
+            end
+        end
+    end
+    return key
+end
+
+
+function _M.read_token(token_file)
+    local token, err = util.read_file(token_file)
+    if err then
+        return nil, err
+    end
+    return util.trim(token)
+end
+
+
+function _M.get_apiserver(conf)
+    local apiserver = {
+        schema = "",
+        host   = "",
+        port   = "",
+    }
+
+    apiserver.schema = conf.service.schema
+    if apiserver.schema ~= "http" and apiserver.schema ~= "https" then
+        return nil, "service.schema should set to one of [http,https] but " .. 
apiserver.schema
+    end
+
+    local err
+    apiserver.host, err = _M.read_env(conf.service.host)
+    if err then
+        return nil, err
+    end
+    if apiserver.host == "" then
+        return nil, "service.host should set to non-empty string"
+    end
+
+    local port
+    port, err = _M.read_env(conf.service.port)
+    if err then
+        return nil, err
+    end
+    apiserver.port = tonumber(port)
+    if not apiserver.port or apiserver.port <= 0 or apiserver.port > 65535 then
+        return nil, "invalid port value: " .. (apiserver.port or "nil")
+    end
+
+    if conf.client.token then
+        local token
+        token, err = _M.read_env(conf.client.token)
+        if err then
+            return nil, err
+        end
+        apiserver.token = util.trim(token)
+    elseif conf.client.token_file and conf.client.token_file ~= "" then
+        setmetatable(apiserver, {
+            __index = function(_, key)
+                if key ~= "token" then
+                    return
+                end
+                local token_file
+                token_file, err = _M.read_env(conf.client.token_file)
+                if err then
+                    core.log.error("failed to read token file path: ", err)
+                    return
+                end
+                local token
+                token, err = _M.read_token(token_file)
+                if err then
+                    core.log.error("failed to read token from file: ", err)
+                    return ""
+                end
+                core.log.debug("re-read the token value")
+                return token
+            end
+        })
+    else
+        return nil, "one of [client.token,client.token_file] should be set but 
none"
+    end
+
+    if apiserver.schema == "https" and apiserver.token == "" then
+        return nil, "apiserver.token should set to non-empty string when 
service.schema is https"
+    end
+
+    -- ssl_verify: use explicit config if set, otherwise default to false
+    if conf.service.ssl_verify ~= nil then
+        apiserver.ssl_verify = conf.service.ssl_verify
+    else
+        apiserver.ssl_verify = false
+    end
+
+    return apiserver
+end
+
+
+function _M.setup_namespace_selector(conf, informer)
+    local ns = conf.namespace_selector
+    if ns == nil then
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.equal then
+        informer.field_selector = "metadata.namespace=" .. ns.equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.not_equal then
+        informer.field_selector = "metadata.namespace!=" .. ns.not_equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.match then
+        informer.namespace_selector = function(self, namespace)
+            local match = conf.namespace_selector.match
+            local m, err
+            for _, v in ipairs(match) do
+                m, err = ngx.re.match(namespace, v, "jo")
+                if m and m[0] == namespace then
+                    return true
+                end
+                if err then
+                    core.log.error("ngx.re.match failed: ", err)
+                end
+            end
+            return false
+        end
+        return
+    end
+
+    if ns.not_match then
+        informer.namespace_selector = function(self, namespace)
+            local not_match = conf.namespace_selector.not_match
+            local m, err
+            for _, v in ipairs(not_match) do
+                m, err = ngx.re.match(namespace, v, "jo")
+                if m and m[0] == namespace then
+                    return false
+                end
+                if err then
+                    return false
+                end
+            end
+            return true
+        end
+        return
+    end
+end
+
+
+function _M.setup_label_selector(conf, informer)
+    informer.label_selector = conf.label_selector
+end
+
+
+-- ─── endpoint dict operations (exported) ──────────────────────────────
+
+function _M.update_endpoint_dict(handle, endpoints, endpoint_key)
+    local endpoint_content = core.json.encode(endpoints, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+    local _, err
+    _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", 
endpoint_version)
+    if err then
+        return false, "set endpoint version into discovery DICT failed, " .. 
err
+    end
+    _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
+    if err then
+        handle.endpoint_dict:delete(endpoint_key .. "#version")
+        return false, "set endpoint into discovery DICT failed, " .. err
+    end
+    return true
+end
+
+
+function _M.create_endpoint_lrucache(endpoint_dict, endpoint_key, 
endpoint_port)
+    local endpoint_content = endpoint_dict:get(endpoint_key)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content from discovery DIC, this 
should not happen ",
+                endpoint_key)
+        return nil
+    end
+
+    local endpoint = core.json.decode(endpoint_content)
+    if not endpoint then
+        core.log.error("decode endpoint content failed, this should not 
happen, content: ",
+                endpoint_content)
+        return nil
+    end
+
+    return endpoint[endpoint_port]
+end
+
+
+-- ─── endpoint callback factory ────────────────────────────────────────
+--- Create a set of informer callbacks parameterized by options.
+---
+--- options:
+---   key_prefix           (string|nil) prefix for endpoint dict keys, used for
+---                         multi-registry isolation on a shared dict.
+---   duplicate_port_number (bool|nil)  when true, store nodes under the 
numeric
+---                         port key in addition to the port name key.
+---
+--- Returns a table: {
+---   on_endpoint_modified, on_endpoint_deleted,
+---   on_endpoint_slices_modified, on_endpoint_slices_deleted,
+---   pre_list, post_list
+--- }
+
+function _M.create_endpoint_callbacks(options)
+    options = options or {}
+    local key_prefix = options.key_prefix
+    local dup_port = options.duplicate_port_number
+
+    -- ── EndpointSlice helpers ──
+
+    local function update_endpoint_slices_cache(handle, endpoint_key, slice, 
slice_name)
+        if not handle.endpoint_slices_cache[endpoint_key] then
+            handle.endpoint_slices_cache[endpoint_key] = {}
+        end
+        handle.endpoint_slices_cache[endpoint_key][slice_name] = slice
+    end
+
+    local function get_endpoints_from_cache(handle, endpoint_key)
+        local endpoint_slices = handle.endpoint_slices_cache[endpoint_key] or 
{}
+        local endpoints = {}
+        for _, endpoint_slice in pairs(endpoint_slices) do
+            for port, targets in pairs(endpoint_slice) do
+                if not endpoints[port] then
+                    endpoints[port] = core.table.new(0, #targets)
+                end
+                core.table.insert_tail(endpoints[port], unpack(targets))
+            end
+        end
+        return endpoints
+    end
+
+    local function validate_endpoint_slice(endpoint_slice)
+        if not endpoint_slice.metadata then
+            return false, "endpoint_slice has no metadata, endpointSlice: "
+                    .. core.json.encode(endpoint_slice)
+        end
+        if not endpoint_slice.metadata.name then
+            return false, "endpoint_slice has no metadata.name, endpointSlice: 
"
+                    .. core.json.encode(endpoint_slice)
+        end
+        if not endpoint_slice.metadata.namespace then
+            return false, "endpoint_slice has no metadata.namespace, 
endpointSlice: "
+                    .. core.json.encode(endpoint_slice)
+        end
+        if not endpoint_slice.metadata.labels
+                or not 
endpoint_slice.metadata.labels[kubernetes_service_name_label] then
+            return false, "endpoint_slice has no service-name, endpointSlice: "
+                    .. core.json.encode(endpoint_slice)
+        end
+        return true
+    end
+
+    -- ── callbacks ──
+
+    local function on_endpoint_slices_modified(handle, endpoint_slice, operate)
+        local ok, err = validate_endpoint_slice(endpoint_slice)
+        if not ok then
+            core.log.error("endpoint_slice validation fail: ", err)
+            return
+        end
+        if handle.namespace_selector and
+                not 
handle:namespace_selector(endpoint_slice.metadata.namespace) then
+            return
+        end
+
+        core.log.debug("get endpoint_slice: ", 
core.json.delay_encode(endpoint_slice))
+        local port_to_nodes = {}
+
+        local slice_endpoints = endpoint_slice.endpoints
+        if not slice_endpoints or slice_endpoints == ngx.null then
+            slice_endpoints = {}
+        end
+
+        for _, ep in ipairs(slice_endpoints) do
+            if ep.addresses and ep.conditions and ep.conditions.ready then
+                local addresses = ep.addresses
+                for _, port in ipairs(endpoint_slice.ports or {}) do
+                    local port_name
+                    if port.name then
+                        port_name = port.name
+                    elseif port.targetPort then
+                        port_name = tostring(port.targetPort)
+                    else
+                        port_name = tostring(port.port)
+                    end
+
+                    local nodes = port_to_nodes[port_name]
+                    if nodes == nil then
+                        nodes = core.table.new(0, #slice_endpoints * 
#addresses)
+                        port_to_nodes[port_name] = nodes
+                    end
+
+                    for _, ip in ipairs(addresses) do
+                        core.table.insert(nodes, {
+                            host = ip,
+                            port = port.port,
+                            weight = handle.default_weight
+                        })
+                    end
+
+                    if dup_port and port.name then
+                        port_to_nodes[tostring(port.port)] = 
core.table.deepcopy(nodes)
+                    end
+                end
+            end
+        end
+
+        local svc_name = 
endpoint_slice.metadata.labels[kubernetes_service_name_label]
+        local endpoint_key = build_endpoint_key(
+            key_prefix, endpoint_slice.metadata.namespace, svc_name)
+        update_endpoint_slices_cache(
+            handle, endpoint_key, port_to_nodes, endpoint_slice.metadata.name)
+
+        local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
+        for _, nodes in pairs(cached_endpoints) do
+            core.table.sort(nodes, sort_nodes_cmp)
+        end
+
+        ok, err = _M.update_endpoint_dict(handle, cached_endpoints, 
endpoint_key)
+        if not ok then
+            core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
+                    ", err: ", err)
+            return
+        end
+        if operate == "list" then
+            handle.current_keys_hash[endpoint_key] = true
+            handle.current_keys_hash[endpoint_key .. "#version"] = true
+        end
+    end
+
+    local function on_endpoint_slices_deleted(handle, endpoint_slice)
+        local ok, err = validate_endpoint_slice(endpoint_slice)
+        if not ok then
+            core.log.error("endpoint_slice validation fail: ", err)
+            return
+        end
+        if handle.namespace_selector and
+                not 
handle:namespace_selector(endpoint_slice.metadata.namespace) then
+            return
+        end
+
+        core.log.debug("delete endpoint_slice: ", 
core.json.delay_encode(endpoint_slice))
+
+        local svc_name = 
endpoint_slice.metadata.labels[kubernetes_service_name_label]
+        local endpoint_key = build_endpoint_key(
+            key_prefix, endpoint_slice.metadata.namespace, svc_name)
+        update_endpoint_slices_cache(handle, endpoint_key, nil, 
endpoint_slice.metadata.name)
+
+        local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
+        for _, nodes in pairs(cached_endpoints) do
+            core.table.sort(nodes, sort_nodes_cmp)
+        end
+
+        ok, err = _M.update_endpoint_dict(handle, cached_endpoints, 
endpoint_key)
+        if not ok then
+            core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
+                    ", err: ", err)
+        end
+    end
+
+    local function on_endpoint_modified(handle, endpoint, operate)
+        if not endpoint or not endpoint.metadata
+                or not endpoint.metadata.namespace or not 
endpoint.metadata.name then
+            core.log.warn("skipping endpoint with missing metadata: ",
+                    core.json.delay_encode(endpoint))
+            return
+        end
+
+        if handle.namespace_selector and
+                not handle:namespace_selector(endpoint.metadata.namespace) then
+            return
+        end
+
+        core.log.debug(core.json.delay_encode(endpoint))
+        core.table.clear(endpoint_buffer)
+
+        local subsets = endpoint.subsets
+        for _, subset in ipairs(subsets or {}) do
+            if subset.addresses then
+                local addresses = subset.addresses
+                for _, port in ipairs(subset.ports or {}) do
+                    local port_name
+                    if port.name then
+                        port_name = port.name
+                    elseif port.targetPort then
+                        port_name = tostring(port.targetPort)
+                    else
+                        port_name = tostring(port.port)
+                    end
+
+                    local nodes = endpoint_buffer[port_name]
+                    if nodes == nil then
+                        nodes = core.table.new(0, #subsets * #addresses)
+                        endpoint_buffer[port_name] = nodes
+                    end
+
+                    for _, address in ipairs(subset.addresses) do
+                        core.table.insert(nodes, {
+                            host = address.ip,
+                            port = port.port,
+                            weight = handle.default_weight
+                        })
+                    end
+
+                    if dup_port and port.name then
+                        endpoint_buffer[tostring(port.port)] = 
core.table.deepcopy(nodes)
+                    end
+                end
+            end
+        end
+
+        for _, nodes in pairs(endpoint_buffer) do
+            core.table.sort(nodes, sort_nodes_cmp)
+        end
+
+        local endpoint_key = build_endpoint_key(
+            key_prefix, endpoint.metadata.namespace, endpoint.metadata.name)
+        local ok, err = _M.update_endpoint_dict(handle, endpoint_buffer, 
endpoint_key)
+        if not ok then
+            core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
+                    ", err: ", err)
+            return
+        end
+        if operate == "list" then
+            handle.current_keys_hash[endpoint_key] = true
+            handle.current_keys_hash[endpoint_key .. "#version"] = true
+        end
+    end
+
+    local function on_endpoint_deleted(handle, endpoint)
+        if not endpoint or not endpoint.metadata
+                or not endpoint.metadata.namespace or not 
endpoint.metadata.name then
+            core.log.warn("skipping endpoint deletion with missing metadata: ",
+                    core.json.delay_encode(endpoint))
+            return
+        end
+
+        if handle.namespace_selector and
+                not handle:namespace_selector(endpoint.metadata.namespace) then
+            return
+        end
+
+        core.log.debug(core.json.delay_encode(endpoint))
+        local endpoint_key = build_endpoint_key(
+            key_prefix, endpoint.metadata.namespace, endpoint.metadata.name)
+        handle.endpoint_dict:delete(endpoint_key .. "#version")
+        handle.endpoint_dict:delete(endpoint_key)
+    end
+
+    -- pre_list / post_list are prefix-aware: when key_prefix is set,
+    -- only keys belonging to this prefix are considered for dirty-data 
cleanup.
+    local function pre_list(handle)
+        handle.current_keys_hash = {}
+        local all_keys = handle.endpoint_dict:get_keys(0)
+        if key_prefix and key_prefix ~= "" then
+            handle.existing_keys = {}
+            local prefix = key_prefix .. "/"
+            for _, key in ipairs(all_keys) do
+                if core.string.has_prefix(key, prefix)
+                        or key == "discovery_ready:" .. key_prefix then
+                    core.table.insert(handle.existing_keys, key)
+                end
+            end
+        else
+            handle.existing_keys = all_keys
+        end
+        if handle.endpoint_slices_cache then
+            handle.endpoint_slices_cache = {}
+        end
+    end
+
+    local function post_list(handle)
+        if handle.existing_keys and handle.current_keys_hash then
+            for _, key in ipairs(handle.existing_keys) do
+                if not handle.current_keys_hash[key] then
+                    core.log.info("kubernetes discovery module found dirty 
data in shared dict, ",
+                                  "key: ", key)
+                    handle.endpoint_dict:delete(key)
+                end
+            end
+            handle.existing_keys = nil
+            handle.current_keys_hash = nil
+        end
+        local ready_key = (key_prefix and key_prefix ~= "")
+            and ("discovery_ready:" .. key_prefix)
+            or "discovery_ready"
+        local _, err = handle.endpoint_dict:safe_set(ready_key, true)
+        if err then
+            core.log.error("set discovery_ready flag into discovery DICT 
failed, ", err)
+        end
+    end
+
+    return {
+        on_endpoint_modified        = on_endpoint_modified,
+        on_endpoint_deleted         = on_endpoint_deleted,
+        on_endpoint_slices_modified = on_endpoint_slices_modified,
+        on_endpoint_slices_deleted  = on_endpoint_slices_deleted,
+        pre_list                    = pre_list,
+        post_list                   = post_list,
+    }
+end
+
+
+-- ─── handle factory ───────────────────────────────────────────────────
+--- Create a fully configured Kubernetes discovery handle.
+---
+--- conf: standard kubernetes discovery config (service, client,
+---       namespace_selector, label_selector, default_weight,
+---       watch_endpoint_slices)
+---
+--- options:
+---   endpoint_dict         (ngx.shared.DICT)  required — the shared dict to 
use
+---   key_prefix            (string|nil)        prefix for endpoint keys
+---   duplicate_port_number (bool|nil)          store nodes under numeric port 
too
+---   informer_factory      (table|nil)         custom informer factory module
+
+function _M.create_handle(conf, options)
+    local endpoint_dict = options.endpoint_dict
+    if not endpoint_dict then
+        return nil, "endpoint_dict is required"
+    end
+
+    local apiserver, err = _M.get_apiserver(conf)
+    if err then
+        return nil, err
+    end
+
+    local default_weight = conf.default_weight or 50
+
+    local inf_factory = options.informer_factory or default_informer_factory
+    local endpoints_informer
+    if conf.watch_endpoint_slices then
+        endpoints_informer, err = inf_factory.new(
+            "discovery.k8s.io", "v1", "EndpointSlice", "endpointslices", "")
+    else
+        endpoints_informer, err = inf_factory.new("", "v1", "Endpoints", 
"endpoints", "")
+    end
+    if err then
+        return nil, err
+    end
+
+    _M.setup_namespace_selector(conf, endpoints_informer)
+    _M.setup_label_selector(conf, endpoints_informer)
+
+    local cbs = _M.create_endpoint_callbacks({
+        key_prefix            = options.key_prefix,
+        duplicate_port_number = options.duplicate_port_number,
+    })
+
+    if conf.watch_endpoint_slices then
+        endpoints_informer.on_added    = cbs.on_endpoint_slices_modified
+        endpoints_informer.on_modified = cbs.on_endpoint_slices_modified
+        endpoints_informer.on_deleted  = cbs.on_endpoint_slices_deleted
+        endpoints_informer.endpoint_slices_cache = {}
+    else
+        endpoints_informer.on_added    = cbs.on_endpoint_modified
+        endpoints_informer.on_modified = cbs.on_endpoint_modified
+        endpoints_informer.on_deleted  = cbs.on_endpoint_deleted
+    end
+
+    endpoints_informer.pre_list  = cbs.pre_list
+    endpoints_informer.post_list = cbs.post_list
+
+    local handle = setmetatable({
+        endpoint_dict  = endpoint_dict,
+        apiserver      = apiserver,
+        default_weight = default_weight,
+    }, { __index = endpoints_informer })
+
+    return handle
+end
+
+
+-- ─── lifecycle ────────────────────────────────────────────────────────
+
+function _M.start_fetch(handle)
+    local timer_runner
+    timer_runner = function(premature)
+        if premature then
+            return
+        end
+        if handle.stop then
+            core.log.info("stop fetching, kind: ", handle.kind)
+            return
+        end
+
+        local ok, status = pcall(handle.list_watch, handle, handle.apiserver)
+
+        local retry_interval = 0
+        if not ok then
+            core.log.error("list_watch failed, kind: ", handle.kind,
+                    ", reason: ", "RuntimeException", ", message : ", status)
+            retry_interval = 40
+        elseif not status then
+            retry_interval = 40
+        end
+
+        if not handle.stop then
+            ngx.timer.at(retry_interval, timer_runner)
+        end
+    end
+    ngx.timer.at(0, timer_runner)
+end
+
+
+-- ─── node resolution ──────────────────────────────────────────────────
+--- Resolve service_name to upstream nodes from a shared dict with LRU cache.
+---
+--- endpoint_lrucache: core.lrucache instance
+--- service_name: the full service name (e.g. "ns/svc:port" or 
"id/ns/svc:port")
+--- pattern: regex to parse service_name; must capture (endpoint_key_group, 
port)
+---          where endpoint_key_group is used as-is for dict lookup
+--- dict_resolver: function(match) → endpoint_dict, endpoint_key, endpoint_port
+---                returns the dict, key, and port to look up
+
+function _M.resolve_nodes(endpoint_lrucache, service_name, pattern, 
dict_resolver)
+    local match = ngx.re.match(service_name, pattern, "jo")
+    if not match then
+        core.log.error("get unexpected upstream service_name: ", service_name)
+        return nil
+    end
+
+    local endpoint_dict, endpoint_key, endpoint_port = dict_resolver(match)
+    if not endpoint_dict then
+        core.log.error("failed to resolve endpoint dict for service: ", 
service_name)
+        return nil
+    end
+
+    local endpoint_version = endpoint_dict:get(endpoint_key .. "#version")
+    if not endpoint_version then
+        core.log.info("get empty endpoint version from discovery DICT ", 
endpoint_key)
+        return nil
+    end
+
+    return endpoint_lrucache(service_name, endpoint_version,
+            _M.create_endpoint_lrucache, endpoint_dict, endpoint_key, 
endpoint_port)
+end
+
+
+-- ─── dict helpers ─────────────────────────────────────────────────────
+
+function _M.dump_endpoints_from_dict(endpoint_dict)
+    local keys, err = endpoint_dict:get_keys(0)
+    if err then
+        core.log.error("get keys from discovery dict failed: ", err)
+        return
+    end
+
+    if not keys or #keys == 0 then
+        return
+    end
+
+    local endpoints = {}
+    for i = 1, #keys do
+        local key = keys[i]
+        if key:sub(-#"#version") ~= "#version"
+                and not core.string.has_prefix(key, "discovery_ready") then
+            local value = endpoint_dict:get(key)
+            core.table.insert(endpoints, {
+                name = key,
+                value = value
+            })
+        end
+    end
+
+    return endpoints
+end
+
+
+return _M
diff --git a/apisix/discovery/kubernetes/informer_factory.lua 
b/apisix/discovery/kubernetes/informer_factory.lua
index 4d650d7bb..0daafef72 100644
--- a/apisix/discovery/kubernetes/informer_factory.lua
+++ b/apisix/discovery/kubernetes/informer_factory.lua
@@ -86,6 +86,9 @@ local function list(httpc, apiserver, informer)
 
     informer.continue = data.metadata.continue
     if informer.continue and informer.continue ~= "" then
+        if informer.stop then
+            return true
+        end
         list(httpc, apiserver, informer)
     end
 
@@ -199,6 +202,10 @@ end
 local function watch(httpc, apiserver, informer)
     local watch_times = 8
     for _ = 1, watch_times do
+        if informer.stop then
+            return true
+        end
+
         local watch_seconds = 1800 + math.random(9, 999)
         informer.overtime = watch_seconds
         local http_seconds = watch_seconds + 120
@@ -231,6 +238,10 @@ local function watch(httpc, apiserver, informer)
         local reason
 
         while true do
+            if informer.stop then
+                return true
+            end
+
             body, err = response.body_reader()
             if err then
                 return false, "ReadBodyError", err
@@ -266,15 +277,25 @@ local function list_watch(informer, apiserver)
     informer.continue = ""
     informer.version = ""
 
+    if informer.stop then
+        return true
+    end
+
     informer.fetch_state = "connecting"
     core.log.info("begin to connect ", apiserver.host, ":", apiserver.port)
 
-    ok, message = httpc:connect({
+    local connect_opts = {
         scheme = apiserver.schema,
         host = apiserver.host,
         port = apiserver.port,
-        ssl_verify = apiserver.ssl_verify
-    })
+        ssl_verify = apiserver.ssl_verify or false,
+    }
+
+    if apiserver.ssl_server_name then
+        connect_opts.ssl_server_name = apiserver.ssl_server_name
+    end
+
+    ok, message = httpc:connect(connect_opts)
 
     if not ok then
         informer.fetch_state = "connect failed"
@@ -298,10 +319,14 @@ local function list_watch(informer, apiserver)
     end
 
     informer.fetch_state = "list finished"
-    if informer.post_list then
+    if informer.post_list and not informer.stop then
         informer:post_list()
     end
 
+    if informer.stop then
+        return true
+    end
+
     core.log.info("begin to watch ", informer.kind)
     informer.fetch_state = "watching"
     ok, reason, message = watch(httpc, apiserver, informer)
@@ -370,6 +395,7 @@ function _M.new(group, version, kind, plural, namespace)
         overtime = "1800",
         version = "",
         continue = "",
+        stop = false,
         list_watch = list_watch
     }
 end
diff --git a/apisix/discovery/kubernetes/init.lua 
b/apisix/discovery/kubernetes/init.lua
index a6cf828e2..f92077923 100644
--- a/apisix/discovery/kubernetes/init.lua
+++ b/apisix/discovery/kubernetes/init.lua
@@ -17,22 +17,15 @@
 
 local ngx = ngx
 local type = type
-local unpack = unpack
 local ipairs = ipairs
 local pairs = pairs
 local string = string
-local tonumber = tonumber
-local tostring = tostring
-local os = os
 local error = error
-local pcall = pcall
-local setmetatable = setmetatable
 local is_http = ngx.config.subsystem == "http"
 local process = require("ngx.process")
 local core = require("apisix.core")
-local util = require("apisix.cli.util")
 local local_conf = require("apisix.core.config_local").local_conf()
-local informer_factory = 
require("apisix.discovery.kubernetes.informer_factory")
+local k8s_core = require("apisix.discovery.kubernetes.core")
 
 
 local ctx
@@ -42,501 +35,12 @@ local endpoint_lrucache = core.lrucache.new({
     count = 1024
 })
 
-local endpoint_buffer = {}
-local kubernetes_service_name_label = "kubernetes.io/service-name"
-
-local function sort_nodes_cmp(left, right)
-    if left.host ~= right.host then
-        return left.host < right.host
-    end
-
-    return left.port < right.port
-end
-
-local function update_endpoint_slices_cache(handle, endpoint_key, slice, 
slice_name)
-    if not handle.endpoint_slices_cache[endpoint_key] then
-        handle.endpoint_slices_cache[endpoint_key] = {}
-    end
-    local endpoint_slices = handle.endpoint_slices_cache[endpoint_key]
-    endpoint_slices[slice_name] = slice
-end
-
-local function get_endpoints_from_cache(handle, endpoint_key)
-    local endpoint_slices = handle.endpoint_slices_cache[endpoint_key] or {}
-    local endpoints = {}
-    for _, endpoint_slice in pairs(endpoint_slices) do
-        for port, targets in pairs(endpoint_slice) do
-            if not endpoints[port] then
-                endpoints[port] = core.table.new(0, #targets)
-            end
-            core.table.insert_tail(endpoints[port], unpack(targets))
-        end
-    end
-
-    return endpoints
-end
-
-local function update_endpoint_dict(handle, endpoints, endpoint_key)
-    local endpoint_content = core.json.encode(endpoints, true)
-    local endpoint_version = ngx.crc32_long(endpoint_content)
-    local _, err
-    _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", 
endpoint_version)
-    if err then
-        return false, "set endpoint version into discovery DICT failed, " .. 
err
-    end
-    _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
-    if err then
-        handle.endpoint_dict:delete(endpoint_key .. "#version")
-        return false, "set endpoint into discovery DICT failed, " .. err
-    end
-
-    return true
-end
-
-local function validate_endpoint_slice(endpoint_slice)
-    if not endpoint_slice.metadata then
-        return false, "endpoint_slice has no metadata, endpointSlice: "
-                .. core.json.encode(endpoint_slice)
-    end
-    if not endpoint_slice.metadata.name then
-        return false, "endpoint_slice has no metadata.name, endpointSlice: "
-                .. core.json.encode(endpoint_slice)
-    end
-    if not endpoint_slice.metadata.namespace then
-        return false, "endpoint_slice has no metadata.namespace, 
endpointSlice: "
-                .. core.json.encode(endpoint_slice)
-    end
-    if not endpoint_slice.metadata.labels
-            or not 
endpoint_slice.metadata.labels[kubernetes_service_name_label] then
-        return false, "endpoint_slice has no service-name, endpointSlice: "
-                .. core.json.encode(endpoint_slice)
-    end
-
-    return true
-end
-
-local function on_endpoint_slices_modified(handle, endpoint_slice, operate)
-    local ok, err = validate_endpoint_slice(endpoint_slice)
-    if not ok then
-        core.log.error("endpoint_slice validation fail: ", err)
-        return
-    end
-    if handle.namespace_selector and
-            not handle:namespace_selector(endpoint_slice.metadata.namespace) 
then
-        return
-    end
-
-    core.log.debug("get endpoint_slice: ", 
core.json.delay_encode(endpoint_slice))
-    --record nodes to every port in service
-    local port_to_nodes = {}
-
-    local slice_endpoints = endpoint_slice.endpoints
-    if not slice_endpoints or slice_endpoints == ngx.null then
-        slice_endpoints = {}
-    end
-
-    for _, endpoint in ipairs(slice_endpoints) do
-        if endpoint.addresses
-                and endpoint.conditions
-                and endpoint.conditions.ready then
-            local addresses = endpoint.addresses
-            for _, port in ipairs(endpoint_slice.ports or {}) do
-                local port_name
-                if port.name then
-                    port_name = port.name
-                elseif port.targetPort then
-                    port_name = tostring(port.targetPort)
-                else
-                    port_name = tostring(port.port)
-                end
-
-                local nodes = port_to_nodes[port_name]
-                if nodes == nil then
-                    nodes = core.table.new(0, #slice_endpoints * #addresses)
-                    port_to_nodes[port_name] = nodes
-                end
-
-                for _, ip in ipairs(addresses) do
-                    core.table.insert(nodes, {
-                        host = ip,
-                        port = port.port,
-                        weight = handle.default_weight
-                    })
-                end
-            end
-        end
-    end
-
-    local endpoint_key = endpoint_slice.metadata.namespace
-            .. "/" .. 
endpoint_slice.metadata.labels[kubernetes_service_name_label]
-    update_endpoint_slices_cache(handle, endpoint_key, port_to_nodes, 
endpoint_slice.metadata.name)
-
-    local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
-    for _, nodes in pairs(cached_endpoints) do
-        core.table.sort(nodes, sort_nodes_cmp)
-    end
-
-    local ok, err = update_endpoint_dict(handle, cached_endpoints, 
endpoint_key)
-    if not ok then
-        core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
-                ", err: ", err)
-        return
-    end
-    if operate == "list" then
-        handle.current_keys_hash[endpoint_key] = true
-        handle.current_keys_hash[endpoint_key .. "#version"] = true
-    end
-end
-
-local function on_endpoint_slices_deleted(handle, endpoint_slice)
-    local ok, err = validate_endpoint_slice(endpoint_slice)
-    if not ok then
-        core.log.error("endpoint_slice validation fail: ", err)
-        return
-    end
-
-    if handle.namespace_selector and
-            not handle:namespace_selector(endpoint_slice.metadata.namespace) 
then
-        return
-    end
-
-    core.log.debug("delete endpoint_slice: ", 
core.json.delay_encode(endpoint_slice))
-
-    local endpoint_key = endpoint_slice.metadata.namespace
-            .. "/" .. 
endpoint_slice.metadata.labels[kubernetes_service_name_label]
-    update_endpoint_slices_cache(handle, endpoint_key, nil, 
endpoint_slice.metadata.name)
-
-    local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
-    for _, nodes in pairs(cached_endpoints) do
-        core.table.sort(nodes, sort_nodes_cmp)
-    end
-
-    ok, err = update_endpoint_dict(handle, cached_endpoints, endpoint_key)
-    if not ok then
-        core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
-                ", err: ", err)
-    end
-end
-
-local function on_endpoint_modified(handle, endpoint, operate)
-    if handle.namespace_selector and
-            not handle:namespace_selector(endpoint.metadata.namespace) then
-        return
-    end
-
-    core.log.debug(core.json.delay_encode(endpoint))
-    core.table.clear(endpoint_buffer)
-
-    local subsets = endpoint.subsets
-    for _, subset in ipairs(subsets or {}) do
-        if subset.addresses then
-            local addresses = subset.addresses
-            for _, port in ipairs(subset.ports or {}) do
-                local port_name
-                if port.name then
-                    port_name = port.name
-                elseif port.targetPort then
-                    port_name = tostring(port.targetPort)
-                else
-                    port_name = tostring(port.port)
-                end
-
-                local nodes = endpoint_buffer[port_name]
-                if nodes == nil then
-                    nodes = core.table.new(0, #subsets * #addresses)
-                    endpoint_buffer[port_name] = nodes
-                end
-
-                for _, address in ipairs(subset.addresses) do
-                    core.table.insert(nodes, {
-                        host = address.ip,
-                        port = port.port,
-                        weight = handle.default_weight
-                    })
-                end
-            end
-        end
-    end
-
-
-    for _, nodes in pairs(endpoint_buffer) do
-        core.table.sort(nodes, sort_nodes_cmp)
-    end
-
-    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
-    local ok, err = update_endpoint_dict(handle, endpoint_buffer, endpoint_key)
-    if not ok then
-        core.log.error("failed to update endpoint dict for endpoint: ", 
endpoint_key,
-                ", err: ", err)
-        return
-    end
-    if operate == "list" then
-        handle.current_keys_hash[endpoint_key] = true
-        handle.current_keys_hash[endpoint_key .. "#version"] = true
-    end
-end
-
-
-local function on_endpoint_deleted(handle, endpoint)
-    if handle.namespace_selector and
-            not handle:namespace_selector(endpoint.metadata.namespace) then
-        return
-    end
-
-    core.log.debug(core.json.delay_encode(endpoint))
-    local endpoint_key = endpoint.metadata.namespace .. "/" .. 
endpoint.metadata.name
-    handle.endpoint_dict:delete(endpoint_key .. "#version")
-    handle.endpoint_dict:delete(endpoint_key)
-end
-
-
-local function pre_list(handle)
-    handle.current_keys_hash = {}
-    handle.existing_keys = handle.endpoint_dict:get_keys(0)
-    if handle.endpoint_slices_cache then
-        handle.endpoint_slices_cache = {}
-    end
-end
-
-
-local function post_list(handle)
-    if handle.existing_keys and handle.current_keys_hash then
-        for _, key in ipairs(handle.existing_keys) do
-            if not handle.current_keys_hash[key] then
-                core.log.info("kubernetes discovery module found dirty data in 
shared dict, key: ",
-                              key)
-                handle.endpoint_dict:delete(key)
-            end
-        end
-        handle.existing_keys = nil
-        handle.current_keys_hash = nil
-    end
-    local _, err = handle.endpoint_dict:safe_set("discovery_ready", true)
-    if err then
-        core.log.error("set discovery_ready flag into discovery DICT failed, 
", err)
-    end
-end
-
-
-local function setup_label_selector(conf, informer)
-    informer.label_selector = conf.label_selector
-end
-
-
-local function setup_namespace_selector(conf, informer)
-    local ns = conf.namespace_selector
-    if ns == nil then
-        informer.namespace_selector = nil
-        return
-    end
-
-    if ns.equal then
-        informer.field_selector = "metadata.namespace=" .. ns.equal
-        informer.namespace_selector = nil
-        return
-    end
-
-    if ns.not_equal then
-        informer.field_selector = "metadata.namespace!=" .. ns.not_equal
-        informer.namespace_selector = nil
-        return
-    end
-
-    if ns.match then
-        informer.namespace_selector = function(self, namespace)
-            local match = conf.namespace_selector.match
-            local m, err
-            for _, v in ipairs(match) do
-                m, err = ngx.re.match(namespace, v, "jo")
-                if m and m[0] == namespace then
-                    return true
-                end
-                if err then
-                    core.log.error("ngx.re.match failed: ", err)
-                end
-            end
-            return false
-        end
-        return
-    end
-
-    if ns.not_match then
-        informer.namespace_selector = function(self, namespace)
-            local not_match = conf.namespace_selector.not_match
-            local m, err
-            for _, v in ipairs(not_match) do
-                m, err = ngx.re.match(namespace, v, "jo")
-                if m and m[0] == namespace then
-                    return false
-                end
-                if err then
-                    return false
-                end
-            end
-            return true
-        end
-        return
-    end
-
-    return
-end
-
-
-local function read_env(key)
-    if #key > 3 then
-        local first, second = string.byte(key, 1, 2)
-        if first == string.byte('$') and second == string.byte('{') then
-            local last = string.byte(key, #key)
-            if last == string.byte('}') then
-                local env = string.sub(key, 3, #key - 1)
-                local value = os.getenv(env)
-                if not value then
-                    return nil, "not found environment variable " .. env
-                end
-                return value
-            end
-        end
-    end
-    return key
-end
-
-local function read_token(token_file)
-    local token, err = util.read_file(token_file)
-    if err then
-        return nil, err
-    end
-
-    -- remove possible extra whitespace
-    return util.trim(token)
-end
-
-local function get_apiserver(conf)
-    local apiserver = {
-        schema = "",
-        host = "",
-        port = "",
-    }
-
-    apiserver.schema = conf.service.schema
-    if apiserver.schema ~= "http" and apiserver.schema ~= "https" then
-        return nil, "service.schema should set to one of [http,https] but " .. 
apiserver.schema
-    end
-
-    local err
-    apiserver.host, err = read_env(conf.service.host)
-    if err then
-        return nil, err
-    end
-
-    if apiserver.host == "" then
-        return nil, "service.host should set to non-empty string"
-    end
-
-    local port
-    port, err = read_env(conf.service.port)
-    if err then
-        return nil, err
-    end
-
-    apiserver.port = tonumber(port)
-    if not apiserver.port or apiserver.port <= 0 or apiserver.port > 65535 then
-        return nil, "invalid port value: " .. apiserver.port
-    end
-
-    if conf.client.token then
-        local token, err = read_env(conf.client.token)
-        if err then
-            return nil, err
-        end
-        apiserver.token = util.trim(token)
-    elseif conf.client.token_file and conf.client.token_file ~= "" then
-        setmetatable(apiserver, {
-            __index = function(_, key)
-                if key ~= "token" then
-                    return
-                end
-
-                local token_file, err = read_env(conf.client.token_file)
-                if err then
-                    core.log.error("failed to read token file path: ", err)
-                    return
-                end
-
-                local token, err = read_token(token_file)
-                if err then
-                    core.log.error("failed to read token from file: ", err)
-                    return
-                end
-                core.log.debug("re-read the token value")
-                return token
-            end
-        })
-    else
-        return nil, "one of [client.token,client.token_file] should be set but 
none"
-    end
-
-    if apiserver.schema == "https" and apiserver.token == "" then
-        return nil, "apiserver.token should set to non-empty string when 
service.schema is https"
-    end
-
-    -- ssl_verify: use explicit config if set, otherwise default to false
-    if conf.service.ssl_verify ~= nil then
-        apiserver.ssl_verify = conf.service.ssl_verify
-    else
-        apiserver.ssl_verify = false
-    end
-
-    return apiserver
-end
-
-local function create_endpoint_lrucache(endpoint_dict, endpoint_key, 
endpoint_port)
-    local endpoint_content = endpoint_dict:get(endpoint_key)
-    if not endpoint_content then
-        core.log.error("get empty endpoint content from discovery DIC, this 
should not happen ",
-                endpoint_key)
-        return nil
-    end
-
-    local endpoint = core.json.decode(endpoint_content)
-    if not endpoint then
-        core.log.error("decode endpoint content failed, this should not 
happen, content: ",
-                endpoint_content)
-        return nil
-    end
-
-    return endpoint[endpoint_port]
-end
-
 
 local _M = {
     version = "0.0.1"
 }
 
 
-local function start_fetch(handle)
-    local timer_runner
-    timer_runner = function(premature)
-        if premature then
-            return
-        end
-
-        local ok, status = pcall(handle.list_watch, handle, handle.apiserver)
-
-        local retry_interval = 0
-        if not ok then
-            core.log.error("list_watch failed, kind: ", handle.kind,
-                    ", reason: ", "RuntimeException", ", message : ", status)
-            retry_interval = 40
-        elseif not status then
-            retry_interval = 40
-        end
-
-        ngx.timer.at(retry_interval, timer_runner)
-    end
-    ngx.timer.at(0, timer_runner)
-end
-
-
 local function get_endpoint_dict_name(id)
     local shm = "kubernetes"
 
@@ -570,71 +74,26 @@ local function single_mode_init(conf)
         return
     end
 
-    local apiserver, err = get_apiserver(conf)
-    if err then
-        error(err)
-        return
-    end
-
-    local default_weight = conf.default_weight
-    local endpoints_informer, err
-    if conf.watch_endpoint_slices then
-        endpoints_informer, err = informer_factory.new("discovery.k8s.io", 
"v1",
-                                                       "EndpointSlice", 
"endpointslices", "")
-    else
-        endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", 
"endpoints", "")
-    end
+    local handle, err = k8s_core.create_handle(conf, {
+        endpoint_dict = endpoint_dict,
+    })
     if err then
         error(err)
         return
     end
 
-    setup_namespace_selector(conf, endpoints_informer)
-    setup_label_selector(conf, endpoints_informer)
-
-    if conf.watch_endpoint_slices then
-        endpoints_informer.on_added = on_endpoint_slices_modified
-        endpoints_informer.on_modified = on_endpoint_slices_modified
-        endpoints_informer.on_deleted = on_endpoint_slices_deleted
-        endpoints_informer.endpoint_slices_cache = {}
-    else
-        endpoints_informer.on_added = on_endpoint_modified
-        endpoints_informer.on_modified = on_endpoint_modified
-        endpoints_informer.on_deleted = on_endpoint_deleted
-    end
-
-    endpoints_informer.pre_list = pre_list
-    endpoints_informer.post_list = post_list
-
-    ctx = setmetatable({
-        endpoint_dict = endpoint_dict,
-        apiserver = apiserver,
-        default_weight = default_weight
-    }, { __index = endpoints_informer })
-
-    start_fetch(ctx)
+    ctx = handle
+    k8s_core.start_fetch(ctx)
 end
 
 
 local function single_mode_nodes(service_name)
-    local pattern = "^(.*):(.*)$" -- namespace/name:port_name
-    local match = ngx.re.match(service_name, pattern, "jo")
-    if not match then
-        core.log.error("get unexpected upstream service_name: ", service_name)
-        return nil
-    end
-
-    local endpoint_dict = ctx
-    local endpoint_key = match[1]
-    local endpoint_port = match[2]
-    local endpoint_version = endpoint_dict:get(endpoint_key .. "#version")
-    if not endpoint_version then
-        core.log.info("get empty endpoint version from discovery DICT ", 
endpoint_key)
-        return nil
-    end
-
-    return endpoint_lrucache(service_name, endpoint_version,
-            create_endpoint_lrucache, endpoint_dict, endpoint_key, 
endpoint_port)
+    return k8s_core.resolve_nodes(
+        endpoint_lrucache, service_name,
+        "^(.*):(.*)$",   -- namespace/name:port_name
+        function(match)
+            return ctx, match[1], match[2]
+        end)
 end
 
 
@@ -678,81 +137,36 @@ local function multiple_mode_init(confs)
                     "please check your APISIX version")
         end
 
-        local apiserver, err = get_apiserver(conf)
-        if err then
-            error(err)
-            return
-        end
-
-        local default_weight = conf.default_weight
-
-        local endpoints_informer, err
-        if conf.watch_endpoint_slices then
-            endpoints_informer, err = informer_factory.new("discovery.k8s.io", 
"v1",
-                                                           "EndpointSlice", 
"endpointslices", "")
-        else
-            endpoints_informer, err = informer_factory.new("", "v1", 
"Endpoints", "endpoints", "")
-        end
+        local handle, err = k8s_core.create_handle(conf, {
+            endpoint_dict = endpoint_dict,
+        })
         if err then
             error(err)
             return
         end
 
-        setup_namespace_selector(conf, endpoints_informer)
-        setup_label_selector(conf, endpoints_informer)
-
-        if conf.watch_endpoint_slices then
-            endpoints_informer.on_added = on_endpoint_slices_modified
-            endpoints_informer.on_modified = on_endpoint_slices_modified
-            endpoints_informer.on_deleted = on_endpoint_slices_deleted
-            endpoints_informer.endpoint_slices_cache = {}
-        else
-            endpoints_informer.on_added = on_endpoint_modified
-            endpoints_informer.on_modified = on_endpoint_modified
-            endpoints_informer.on_deleted = on_endpoint_deleted
-        end
-
-        endpoints_informer.pre_list = pre_list
-        endpoints_informer.post_list = post_list
-
-        ctx[id] = setmetatable({
-            endpoint_dict = endpoint_dict,
-            apiserver = apiserver,
-            default_weight = default_weight
-        }, { __index = endpoints_informer })
+        ctx[id] = handle
     end
 
     for _, item in pairs(ctx) do
-        start_fetch(item)
+        k8s_core.start_fetch(item)
     end
 end
 
 
 local function multiple_mode_nodes(service_name)
-    local pattern = "^(.*)/(.*/.*):(.*)$" -- id/namespace/name:port_name
-    local match = ngx.re.match(service_name, pattern, "jo")
-    if not match then
-        core.log.error("get unexpected upstream service_name: ", service_name)
-        return nil
-    end
-
-    local id = match[1]
-    local endpoint_dict = ctx[id]
-    if not endpoint_dict then
-        core.log.error("id not exist")
-        return nil
-    end
-
-    local endpoint_key = match[2]
-    local endpoint_port = match[3]
-    local endpoint_version = endpoint_dict:get(endpoint_key .. "#version")
-    if not endpoint_version then
-        core.log.info("get empty endpoint version from discovery DICT ", 
endpoint_key)
-        return nil
-    end
-
-    return endpoint_lrucache(service_name, endpoint_version,
-            create_endpoint_lrucache, endpoint_dict, endpoint_key, 
endpoint_port)
+    return k8s_core.resolve_nodes(
+        endpoint_lrucache, service_name,
+        "^(.*)/(.*/.*):(.*)$",   -- id/namespace/name:port_name
+        function(match)
+            local id = match[1]
+            local endpoint_dict = ctx[id]
+            if not endpoint_dict then
+                core.log.error("id not exist")
+                return nil
+            end
+            return endpoint_dict, match[2], match[3]
+        end)
 end
 
 
@@ -769,52 +183,22 @@ function _M.init_worker()
 end
 
 
-local function dump_endpoints_from_dict(endpoint_dict)
-    local keys, err = endpoint_dict:get_keys(0)
-    if err then
-        core.log.error("get keys from discovery dict failed: ", err)
-        return
-    end
-
-    if not keys or #keys == 0 then
-        return
-    end
-
-    local endpoints = {}
-    for i = 1, #keys do
-        local key = keys[i]
-        -- skip key with suffix #version
-        if key:sub(-#"#version") ~= "#version" then
-            local value = endpoint_dict:get(key)
-            core.table.insert(endpoints, {
-                name = key,
-                value = value
-            })
-        end
-    end
-
-    return endpoints
-end
-
-
 function _M.dump_data()
     local discovery_conf = local_conf.discovery.kubernetes
     local eps = {}
 
     if #discovery_conf == 0 then
-        -- Single mode: discovery_conf is a single configuration object
         local endpoint_dict = get_endpoint_dict()
-        local endpoints = dump_endpoints_from_dict(endpoint_dict)
+        local endpoints = k8s_core.dump_endpoints_from_dict(endpoint_dict)
         if endpoints then
             core.table.insert(eps, {
                 endpoints = endpoints
             })
         end
     else
-        -- Multiple mode: discovery_conf is an array of configuration objects
         for _, conf in ipairs(discovery_conf) do
             local endpoint_dict = get_endpoint_dict(conf.id)
-            local endpoints = dump_endpoints_from_dict(endpoint_dict)
+            local endpoints = k8s_core.dump_endpoints_from_dict(endpoint_dict)
             if endpoints then
                 core.table.insert(eps, {
                     id = conf.id,
@@ -836,7 +220,6 @@ local function check_ready(id)
         return false, "failed to get lua_shared_dict: " .. 
get_endpoint_dict_name(id)
             .. ", please check your APISIX version"
     end
-    -- check flag
     local ready = endpoint_dict:get("discovery_ready")
     if not ready then
         core.log.warn("kubernetes discovery not ready")
diff --git a/apisix/discovery/kubernetes/schema.lua 
b/apisix/discovery/kubernetes/schema.lua
index b3f35ae72..0aa39c6bd 100644
--- a/apisix/discovery/kubernetes/schema.lua
+++ b/apisix/discovery/kubernetes/schema.lua
@@ -131,9 +131,6 @@ return {
                         },
                         ssl_verify = {
                             type = "boolean",
-                            description = "Verify the TLS certificate of the 
Kubernetes API " ..
-                                          "server. Defaults to false. Set to 
true to enable " ..
-                                          "certificate verification.",
                         },
                     },
                     default = {
@@ -198,9 +195,6 @@ return {
                             },
                             ssl_verify = {
                                 type = "boolean",
-                                description = "Verify the TLS certificate of 
the Kubernetes " ..
-                                              "API server. Defaults to false. 
Set to true to " ..
-                                              "enable certificate 
verification.",
                             },
                         },
                         required = { "host", "port" }
diff --git a/apisix/discovery/nacos/client.lua 
b/apisix/discovery/nacos/client.lua
new file mode 100644
index 000000000..3f674d3a1
--- /dev/null
+++ b/apisix/discovery/nacos/client.lua
@@ -0,0 +1,365 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Reusable HTTP client primitives for Nacos service discovery.
+--- Extracted from init.lua so that both static-config mode and
+--- dynamic-config mode can share the same core logic.
+
+local require         = require
+local http            = require('resty.http')
+local core            = require('apisix.core')
+local ipairs          = ipairs
+local type            = type
+local ngx             = ngx
+local string          = string
+local string_sub      = string.sub
+local str_byte        = string.byte
+local str_find        = core.string.find
+local log             = core.log
+
+local auth_path = 'auth/login'
+local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
+local default_namespace_id = "public"
+local default_group_name = "DEFAULT_GROUP"
+
+
+local _M = {}
+
+
+-- ─── HTTP primitives ──────────────────────────────────────────────────
+
+function _M.request(request_uri, path, body, method, basic_auth, timeout)
+    local url = request_uri .. path
+    log.info('request url:', request_uri, path)
+    local headers = {}
+    headers['Accept'] = 'application/json'
+
+    if basic_auth then
+        headers['Authorization'] = basic_auth
+    end
+
+    if body and 'table' == type(body) then
+        local err
+        body, err = core.json.encode(body)
+        if not body then
+            return nil, 'invalid body : ' .. err
+        end
+        headers['Content-Type'] = 'application/json'
+    end
+
+    local httpc = http.new()
+    timeout = timeout or {}
+    local connect_timeout = timeout.connect or 2000
+    local send_timeout = timeout.send or 5000
+    local read_timeout = timeout.read or 5000
+    httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
+    local res, err = httpc:request_uri(url, {
+        method = method,
+        headers = headers,
+        body = body,
+        ssl_verify = true,
+    })
+    if not res then
+        return nil, err
+    end
+
+    if not res.body or res.status ~= 200 then
+        return nil, 'status = ' .. res.status
+    end
+
+    local json_str = res.body
+    local data, decode_err = core.json.decode(json_str)
+    if not data then
+        return nil, decode_err
+    end
+    return data
+end
+
+
+-- ─── authentication ───────────────────────────────────────────────────
+
+function _M.get_token_param(base_uri, username, password, timeout)
+    if not username or not password then
+        return ''
+    end
+
+    local args = { username = username, password = password }
+    local data, err = _M.request(base_uri, auth_path .. '?' .. 
ngx.encode_args(args),
+                                 nil, 'POST', nil, timeout)
+    if err then
+        log.error('nacos login fail:', username, ' desc:', err)
+        return nil, err
+    end
+    if type(data) ~= "table" or not data.accessToken or data.accessToken == "" 
then
+        return nil, 'nacos login response missing accessToken'
+    end
+    return '&accessToken=' .. data.accessToken
+end
+
+
+function _M.get_signed_param(group_name, service_name, access_key, secret_key)
+    local param = ''
+    if access_key and access_key ~= '' and secret_key and secret_key ~= '' then
+        local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' .. 
service_name
+        local args = {
+            ak = access_key,
+            data = str_to_sign,
+            signature = ngx.encode_base64(ngx.hmac_sha1(secret_key, 
str_to_sign))
+        }
+        param = '&' .. ngx.encode_args(args)
+    end
+    return param
+end
+
+
+-- ─── URL building ─────────────────────────────────────────────────────
+
+function _M.build_base_uri(url, prefix)
+    local auth_idx = core.string.rfind_char(url, '@')
+    local username, password
+    if auth_idx then
+        local protocol_idx = str_find(url, '://')
+        if not protocol_idx or protocol_idx >= auth_idx then
+            return nil
+        end
+        local protocol = string_sub(url, 1, protocol_idx + 2)
+        local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 
1)
+        local colon_idx = str_find(user_and_password, ':')
+        if colon_idx then
+            username = string_sub(user_and_password, 1, colon_idx - 1)
+            password = string_sub(user_and_password, colon_idx + 1)
+        end
+        local other = string_sub(url, auth_idx + 1)
+        url = protocol .. other
+    end
+
+    if prefix then
+        url = url .. prefix
+    end
+
+    if str_byte(url, #url) ~= str_byte('/') then
+        url = url .. '/'
+    end
+
+    return url, username, password
+end
+
+
+-- ─── query param helpers ──────────────────────────────────────────────
+
+local function get_namespace_param(namespace_id)
+    local param = ''
+    if namespace_id then
+        local args = { namespaceId = namespace_id }
+        param = '&' .. ngx.encode_args(args)
+    end
+    return param
+end
+
+
+local function get_group_name_param(group_name)
+    local param = ''
+    if group_name then
+        local args = { groupName = group_name }
+        param = '&' .. ngx.encode_args(args)
+    end
+    return param
+end
+
+
+local function is_grpc(scheme)
+    return scheme == 'grpc' or scheme == 'grpcs'
+end
+
+
+-- ─── instance fetching ────────────────────────────────────────────────
+
+--- Fetch instances from a single nacos host for a list of services.
+---
+--- Returns: nodes_cache (table of key → nodes), service_names (set of key → 
true)
+---          On failure: nil, nil, err_string
+---
+--- options:
+---   default_weight     (number)    default node weight
+---   access_key         (string)    AK for HMAC-SHA1 signing (optional)
+---   secret_key         (string)    SK for HMAC-SHA1 signing (optional)
+---   timeout            (table)     { connect, send, read } in ms
+---   preserve_metadata  (bool)      include instance.metadata in returned 
nodes
+---   key_builder        (function)  key_builder(namespace_id, group_name, 
service_name)
+---                                  returns the key to use for this service 
in the result.
+---                                  default: ns_id.group.service
+
+function _M.fetch_from_host(base_uri, username, password, services, options)
+    options = options or {}
+    local dw = options.default_weight or 100
+    local ak = options.access_key
+    local sk = options.secret_key
+    local timeout = options.timeout
+    local preserve_metadata = options.preserve_metadata
+    local key_builder = options.key_builder
+
+    local token_param, err = _M.get_token_param(base_uri, username, password, 
timeout)
+    if err then
+        return nil, nil, err
+    end
+
+    local service_names = {}
+    local nodes_cache = {}
+    local had_success = false
+
+    for _, service_info in ipairs(services) do
+        local namespace_id = service_info.namespace_id
+        local group_name = service_info.group_name
+        local scheme = service_info.scheme or ''
+        local namespace_param = get_namespace_param(namespace_id)
+        local group_name_param = get_group_name_param(group_name)
+        local signature_param = _M.get_signed_param(
+            group_name, service_info.service_name, ak, sk)
+        local query_path = instance_list_path .. service_info.service_name
+                           .. token_param .. namespace_param .. 
group_name_param
+                           .. signature_param
+        local data, req_err = _M.request(base_uri, query_path, nil, 'GET', 
nil, timeout)
+        if req_err then
+            log.error('get_url:', query_path, ' err:', req_err)
+        else
+            had_success = true
+
+            local key
+            if key_builder then
+                key = key_builder(namespace_id, group_name, 
service_info.service_name)
+            else
+                key = namespace_id .. '.' .. group_name .. '.' .. 
service_info.service_name
+            end
+            service_names[key] = true
+
+            local hosts = data.hosts
+            if type(hosts) ~= 'table' then
+                hosts = {}
+            end
+
+            local nodes = {}
+            for _, host in ipairs(hosts) do
+                local node = {
+                    host = host.ip,
+                    port = host.port,
+                    weight = host.weight or dw,
+                }
+                if is_grpc(scheme) and host.metadata and 
host.metadata.gRPC_port then
+                    node.port = host.metadata.gRPC_port
+                end
+                if preserve_metadata and host.metadata then
+                    node.metadata = host.metadata
+                end
+                core.table.insert(nodes, node)
+            end
+
+            nodes_cache[key] = nodes
+        end
+    end
+
+    if not had_success then
+        return nil, nil, 'all nacos services fetch failed'
+    end
+
+    return nodes_cache, service_names
+end
+
+
+-- ─── service scanning ─────────────────────────────────────────────────
+
+local function de_duplication(services, namespace_id, group_name, 
service_name, scheme)
+    for _, service in ipairs(services) do
+        if service.namespace_id == namespace_id and service.group_name == 
group_name
+                and service.service_name == service_name and service.scheme == 
scheme then
+            return true
+        end
+    end
+    return false
+end
+
+
+local function iter_and_add_service(services, values, filter)
+    if not values then
+        return
+    end
+
+    for _, value in core.config_util.iterate_values(values) do
+        local conf = value.value
+        if not conf then
+            goto CONTINUE
+        end
+
+        local up
+        if conf.upstream then
+            up = conf.upstream
+        else
+            up = conf
+        end
+
+        if up.discovery_type ~= 'nacos' then
+            goto CONTINUE
+        end
+
+        if filter and not filter(up) then
+            goto CONTINUE
+        end
+
+        local namespace_id = (up.discovery_args and 
up.discovery_args.namespace_id)
+                             or default_namespace_id
+        local group_name = (up.discovery_args and up.discovery_args.group_name)
+                           or default_group_name
+
+        local dup = de_duplication(services, namespace_id, group_name,
+                up.service_name, up.scheme)
+        if dup then
+            goto CONTINUE
+        end
+
+        core.table.insert(services, {
+            service_name = up.service_name,
+            namespace_id = namespace_id,
+            group_name = group_name,
+            scheme = up.scheme,
+        })
+        ::CONTINUE::
+    end
+end
+
+
+--- Scan APISIX routes/services/upstreams for nacos discovery references.
+--- filter: optional function(upstream) → bool, called on each upstream config.
+function _M.get_nacos_services(filter)
+    local services = {}
+
+    -- lazy load to work around circular dependency
+    local get_upstreams = require('apisix.upstream').upstreams
+    local get_routes = require('apisix.router').http_routes
+    local get_stream_routes = require('apisix.router').stream_routes
+    local get_services = require('apisix.http.service').services
+    local values = get_upstreams()
+    iter_and_add_service(services, values, filter)
+    values = get_routes()
+    iter_and_add_service(services, values, filter)
+    values = get_services()
+    iter_and_add_service(services, values, filter)
+    values = get_stream_routes()
+    iter_and_add_service(services, values, filter)
+    return services
+end
+
+
+return _M
diff --git a/apisix/discovery/nacos/init.lua b/apisix/discovery/nacos/init.lua
index 74875a6a9..1c5087061 100644
--- a/apisix/discovery/nacos/init.lua
+++ b/apisix/discovery/nacos/init.lua
@@ -17,418 +17,301 @@
 
 local require            = require
 local local_conf         = require('apisix.core.config_local').local_conf()
-local http               = require('resty.http')
 local core               = require('apisix.core')
+local nacos_client       = require('apisix.discovery.nacos.client')
+local is_http            = ngx.config.subsystem == "http"
 local ipairs             = ipairs
 local pairs              = pairs
-local type               = type
+local error              = error
 local math_random        = math.random
 local ngx                = ngx
-local ngx_re             = require('ngx.re')
 local ngx_timer_at       = ngx.timer.at
-local ngx_timer_every    = ngx.timer.every
-local string             = string
-local string_sub         = string.sub
-local str_byte           = string.byte
-local str_find           = core.string.find
 local log                = core.log
 
-local default_weight
-local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name
-if not nacos_dict then
-    error("lua_shared_dict \"nacos\" not configured")
-end
-
-local auth_path = 'auth/login'
-local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
-local default_namespace_id = "public"
-local default_group_name = "DEFAULT_GROUP"
-local access_key
-local secret_key
-
-
 local _M = {}
 
-local function get_key(namespace_id, group_name, service_name)
-    return namespace_id .. '.' .. group_name .. '.' .. service_name
-end
-
-local function request(request_uri, path, body, method, basic_auth)
-    local url = request_uri .. path
-    log.info('request url:', url)
-    local headers = {}
-    headers['Accept'] = 'application/json'
+local nacos_dict
+local registries = {}
 
-    if basic_auth then
-        headers['Authorization'] = basic_auth
-    end
+local dict_name = is_http and "nacos" or "nacos-stream"
 
-    if body and 'table' == type(body) then
-        local err
-        body, err = core.json.encode(body)
-        if not body then
-            return nil, 'invalid body : ' .. err
-        end
-        headers['Content-Type'] = 'application/json'
+local function get_dict()
+    if not nacos_dict then
+        nacos_dict = ngx.shared[dict_name]
     end
-
-    local httpc = http.new()
-    local timeout = local_conf.discovery.nacos.timeout
-    local connect_timeout = timeout.connect
-    local send_timeout = timeout.send
-    local read_timeout = timeout.read
-    log.info('connect_timeout:', connect_timeout, ', send_timeout:', 
send_timeout,
-             ', read_timeout:', read_timeout)
-    httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
-    local res, err = httpc:request_uri(url, {
-        method = method,
-        headers = headers,
-        body = body,
-        ssl_verify = true,
-    })
-    if not res then
-        return nil, err
-    end
-
-    if not res.body or res.status ~= 200 then
-        return nil, 'status = ' .. res.status
-    end
-
-    local json_str = res.body
-    local data, err = core.json.decode(json_str)
-    if not data then
-        return nil, err
-    end
-    return data
+    return nacos_dict
 end
 
 
-local function get_url(request_uri, path)
-    return request(request_uri, path, nil, 'GET', nil)
+local function default_key_builder(id)
+    return function(namespace_id, group_name, service_name)
+        return id .. "/" .. namespace_id .. "/" .. group_name .. "/" .. 
service_name
+    end
 end
 
 
-local function post_url(request_uri, path, body)
-    return request(request_uri, path, body, 'POST', nil)
-end
-
+local function fetch_full_registry(premature, reg)
+    if premature or reg.stop_flag then
+        return
+    end
 
-local function get_token_param(base_uri, username, password)
-    if not username or not password then
-        return ''
+    local dict = get_dict()
+    if not dict then
+        log.error("nacos shared dict not available")
+        return
     end
 
-    local args = { username = username, password = password}
-    local data, err = post_url(base_uri, auth_path .. '?' .. 
ngx.encode_args(args), nil)
-    if err then
-        log.error('nacos login fail:', username, ' ', password, ' desc:', err)
-        return nil, err
+    local services = reg.service_scanner()
+    if reg.stop_flag then
+        return
     end
-    return '&accessToken=' .. data.accessToken
-end
 
+    local prefix = reg.id .. "/"
 
-local function get_namespace_param(namespace_id)
-    local param = ''
-    if namespace_id then
-        local args = {namespaceId = namespace_id}
-        param = '&' .. ngx.encode_args(args)
+    if #services == 0 then
+        local all_keys = dict:get_keys(0)
+        for _, key in ipairs(all_keys) do
+            if core.string.has_prefix(key, prefix) then
+                dict:delete(key)
+            end
+        end
+        if not reg.stop_flag then
+            ngx_timer_at(reg.conf.fetch_interval or 30, fetch_full_registry, 
reg)
+        end
+        return
     end
-    return param
-end
 
+    local hosts = reg.conf.host
+    local host_count = #hosts
+    local start = math_random(host_count)
+    local timeout = reg.conf.timeout
 
-local function get_group_name_param(group_name)
-    local param = ''
-    if group_name then
-        local args = {groupName = group_name}
-        param = '&' .. ngx.encode_args(args)
-    end
-    return param
-end
+    for i = 0, host_count - 1 do
+        if reg.stop_flag then
+            return
+        end
 
+        local idx = (start + i - 1) % host_count + 1
+        local base_uri, username, password = nacos_client.build_base_uri(
+            hosts[idx], reg.conf.prefix)
 
-local function get_signed_param(group_name, service_name)
-    local param = ''
-    if access_key ~= '' and secret_key ~= '' then
-        local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' .. 
service_name
-        local args = {
-            ak = access_key,
-            data = str_to_sign,
-            signature = ngx.encode_base64(ngx.hmac_sha1(secret_key, 
str_to_sign))
-        }
-        param = '&' .. ngx.encode_args(args)
-    end
-    return param
-end
+        if not base_uri then
+            log.warn("nacos host at index ", idx, " is invalid, skip")
+        else
+            local nodes_cache, service_names, err = 
nacos_client.fetch_from_host(
+                base_uri,
+                username or reg.username,
+                password or reg.password,
+                services, {
+                    default_weight    = reg.conf.weight,
+                    access_key        = reg.conf.access_key,
+                    secret_key        = reg.conf.secret_key,
+                    timeout           = timeout,
+                    preserve_metadata = reg.preserve_metadata,
+                    key_builder       = reg.key_builder,
+                })
+
+            if nodes_cache then
+                if reg.stop_flag then
+                    return
+                end
 
+                for key, nodes in pairs(nodes_cache) do
+                    dict:set(key, core.json.encode(nodes))
+                end
 
-local function build_base_uri(url)
-    local auth_idx = core.string.rfind_char(url, '@')
-    local username, password
-    if auth_idx then
-        local protocol_idx = str_find(url, '://')
-        local protocol = string_sub(url, 1, protocol_idx + 2)
-        local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 
1)
-        local arr = ngx_re.split(user_and_password, ':')
-        if #arr == 2 then
-            username = arr[1]
-            password = arr[2]
-        end
-        local other = string_sub(url, auth_idx + 1)
-        url = protocol .. other
-    end
+                local all_keys = dict:get_keys(0)
+                for _, key in ipairs(all_keys) do
+                    if core.string.has_prefix(key, prefix)
+                            and not service_names[key] then
+                        dict:delete(key)
+                    end
+                end
 
-    if local_conf.discovery.nacos.prefix then
-        url = url .. local_conf.discovery.nacos.prefix
+                log.info("nacos registry updated, id: ", reg.id,
+                         ", services: ", #services)
+                if not reg.stop_flag then
+                    ngx_timer_at(reg.conf.fetch_interval or 30,
+                                 fetch_full_registry, reg)
+                end
+                return
+            end
+            log.error("fetch_from_host: ", base_uri, " err:", err)
+        end
     end
 
-    if str_byte(url, #url) ~= str_byte('/') then
-        url = url .. '/'
+    log.error("failed to fetch nacos registry from all hosts, id: ", reg.id)
+    if not reg.stop_flag then
+        ngx_timer_at(reg.conf.fetch_interval or 30, fetch_full_registry, reg)
     end
-
-    return url, username, password
 end
 
 
-local function get_base_uri_by_index(index)
-    local host = local_conf.discovery.nacos.host
+-- ─── Registry management API ──────────────────────────────────────────
+
+--- Create a nacos registry instance.
+---
+--- conf fields: id, host (array), fetch_interval, prefix, weight,
+---              access_key, secret_key, timeout ({connect,send,read} in ms)
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+---          key_builder (function(ns,group,svc)->string),
+---          username (string), password (string)
+function _M.create_registry(conf, options)
+    options = options or {}
+    local id = conf.id
+    if not id or id == "" then
+        return nil, "registry id is required"
+    end
 
-    local url = host[index]
-    if not url then
-        return nil
+    if registries[id] then
+        _M.stop_registry(id)
     end
 
-    return build_base_uri(url)
+    local reg = {
+        id              = id,
+        conf            = conf,
+        stop_flag       = false,
+        preserve_metadata = options.preserve_metadata or false,
+        key_builder     = options.key_builder or default_key_builder(id),
+        service_scanner = options.service_scanner or function()
+            return nacos_client.get_nacos_services()
+        end,
+        username        = options.username,
+        password        = options.password,
+    }
+
+    registries[id] = reg
+    return reg
 end
 
 
-local function de_duplication(services, namespace_id, group_name, 
service_name, scheme)
-    for _, service in ipairs(services) do
-        if service.namespace_id == namespace_id and service.group_name == 
group_name
-                and service.service_name == service_name and service.scheme == 
scheme then
-            return true
-        end
-    end
-    return false
+function _M.start_registry(reg)
+    ngx_timer_at(0, fetch_full_registry, reg)
 end
 
 
-local function iter_and_add_service(services, values)
-    if not values then
+function _M.stop_registry(id)
+    local reg = registries[id]
+    if not reg then
         return
     end
 
-    for _, value in core.config_util.iterate_values(values) do
-        local conf = value.value
-        if not conf then
-            goto CONTINUE
-        end
-
-        local up
-        if conf.upstream then
-            up = conf.upstream
-        else
-            up = conf
-        end
-
-        local namespace_id = (up.discovery_args and 
up.discovery_args.namespace_id)
-                             or default_namespace_id
-
-        local group_name = (up.discovery_args and up.discovery_args.group_name)
-                           or default_group_name
+    reg.stop_flag = true
+    registries[id] = nil
 
-        local dup = de_duplication(services, namespace_id, group_name,
-                up.service_name, up.scheme)
-        if dup then
-            goto CONTINUE
-        end
-
-        if up.discovery_type == 'nacos' then
-            core.table.insert(services, {
-                service_name = up.service_name,
-                namespace_id = namespace_id,
-                group_name = group_name,
-                scheme = up.scheme,
-            })
+    local dict = get_dict()
+    if dict then
+        local prefix = id .. "/"
+        local all_keys = dict:get_keys(0)
+        for _, key in ipairs(all_keys) do
+            if core.string.has_prefix(key, prefix) then
+                dict:delete(key)
+            end
         end
-        ::CONTINUE::
     end
 end
 
 
-local function get_nacos_services()
-    local services = {}
-
-    -- here we use lazy load to work around circle dependency
-    local get_upstreams = require('apisix.upstream').upstreams
-    local get_routes = require('apisix.router').http_routes
-    local get_stream_routes = require('apisix.router').stream_routes
-    local get_services = require('apisix.http.service').services
-    local values = get_upstreams()
-    iter_and_add_service(services, values)
-    values = get_routes()
-    iter_and_add_service(services, values)
-    values = get_services()
-    iter_and_add_service(services, values)
-    values = get_stream_routes()
-    iter_and_add_service(services, values)
-    return services
-end
-
-local function is_grpc(scheme)
-    if scheme == 'grpc' or scheme == 'grpcs' then
-        return true
-    end
-
-    return false
+function _M.get_registry(id)
+    return registries[id]
 end
 
-local curr_service_in_use = {}
-
-
-local function fetch_from_host(base_uri, username, password, services)
-    local token_param, err = get_token_param(base_uri, username, password)
-    if err then
-        return false, err
-    end
-
-    local service_names = {}
-    local nodes_cache = {}
-    local had_success = false
-
-    for _, service_info in ipairs(services) do
-        local namespace_id = service_info.namespace_id
-        local group_name = service_info.group_name
-        local scheme = service_info.scheme or ''
-        local namespace_param = get_namespace_param(namespace_id)
-        local group_name_param = get_group_name_param(group_name)
-        local signature_param = get_signed_param(group_name, 
service_info.service_name)
-        local query_path = instance_list_path .. service_info.service_name
-                           .. token_param .. namespace_param .. 
group_name_param
-                           .. signature_param
-        local data, req_err = get_url(base_uri, query_path)
-        if req_err then
-            log.error('failed to fetch instances for service [', 
service_info.service_name,
-                      '] from ', base_uri, ', error: ', req_err)
-        else
-            had_success = true
-
-            local key = get_key(namespace_id, group_name, 
service_info.service_name)
-            service_names[key] = true
 
-            local hosts = data.hosts
-            if type(hosts) ~= 'table' then
-                hosts = {}
-            end
-
-            local nodes = {}
-            for _, host in ipairs(hosts) do
-                local node = {
-                    host = host.ip,
-                    port = host.port,
-                    weight = host.weight or default_weight,
-                }
-                -- docs: 
https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
-                if is_grpc(scheme) and host.metadata and 
host.metadata.gRPC_port then
-                    node.port = host.metadata.gRPC_port
-                end
-
-                core.table.insert(nodes, node)
-            end
-
-            if #nodes > 0 then
-                nodes_cache[key] = nodes
-            end
-        end
-    end
+-- ─── Shared helpers ──────────────────────────────────────────────────
 
-    if not had_success then
-        return false, 'all nacos services fetch failed'
+local function match_metadata(node_metadata, upstream_metadata)
+    if upstream_metadata == nil then
+        return true
     end
 
-    for key, nodes in pairs(nodes_cache) do
-        local content = core.json.encode(nodes)
-        nacos_dict:set(key, content)
+    if not node_metadata then
+        node_metadata = {}
     end
 
-    for key, _ in pairs(curr_service_in_use) do
-        if not service_names[key] then
-            nacos_dict:delete(key)
+    for k, v in pairs(upstream_metadata) do
+        if not node_metadata[k] or node_metadata[k] ~= v then
+            return false
         end
     end
 
-    curr_service_in_use = service_names
     return true
 end
 
 
-local function fetch_full_registry(premature)
-    if premature then
-        return
+function _M.get_nodes(key, metadata)
+    local dict = get_dict()
+    if not dict then
+        return nil
     end
 
-    local infos = get_nacos_services()
-    if #infos == 0 then
-        return
+    local value = dict:get(key)
+    if not value then
+        return nil
     end
 
-    local host_list = local_conf.discovery.nacos.host
-    local host_count = #host_list
-    local start = math_random(host_count)
-
-    for i = 0, host_count - 1 do
-        local idx = (start + i - 1) % host_count + 1
-        local base_uri, username, password = get_base_uri_by_index(idx)
+    local nodes = core.json.decode(value)
+    if not metadata then
+        return nodes
+    end
 
-        if not base_uri then
-            log.warn('nacos host at index ', idx, ' is invalid, skip')
-        else
-            local ok, err = fetch_from_host(base_uri, username, password, 
infos)
-            if ok then
-                return
-            end
-            log.error('fetch_from_host: ', base_uri, ' err:', err)
+    local res = {}
+    for _, node in ipairs(nodes) do
+        if match_metadata(node.metadata, metadata) then
+            core.table.insert(res, node)
         end
     end
-
-    log.error('failed to fetch nacos registry from all hosts')
+    return res
 end
 
 
+-- ─── Standard discovery interface ─────────────────────────────────────
+
 function _M.nodes(service_name, discovery_args)
     local namespace_id = discovery_args and
-            discovery_args.namespace_id or default_namespace_id
+            discovery_args.namespace_id or "public"
     local group_name = discovery_args
-            and discovery_args.group_name or default_group_name
-    local key = get_key(namespace_id, group_name, service_name)
-    local value = nacos_dict:get(key)
-    if not value then
-        core.log.error("nacos service not found: ", service_name)
-        return nil
-    end
-    local nodes = core.json.decode(value)
-    return nodes
+            and discovery_args.group_name or "DEFAULT_GROUP"
+
+    local key = "default/" .. namespace_id .. "/" .. group_name .. "/" .. 
service_name
+    return _M.get_nodes(key)
 end
 
 
 function _M.init_worker()
-    default_weight = local_conf.discovery.nacos.weight
-    log.info('default_weight:', default_weight)
-    local fetch_interval = local_conf.discovery.nacos.fetch_interval
-    log.info('fetch_interval:', fetch_interval)
-    access_key = local_conf.discovery.nacos.access_key
-    secret_key = local_conf.discovery.nacos.secret_key
-    ngx_timer_at(0, fetch_full_registry)
-    ngx_timer_every(fetch_interval, fetch_full_registry)
+    local dict = ngx.shared[dict_name]
+    if not dict then
+        error('lua_shared_dict "' .. dict_name .. '" not configured')
+    end
+
+    nacos_dict = dict
+
+    local nacos_conf = local_conf.discovery and local_conf.discovery.nacos
+    if not nacos_conf then
+        return
+    end
+
+    -- shallow copy to avoid mutating cached config
+    local conf = {}
+    for k, v in pairs(nacos_conf) do
+        conf[k] = v
+    end
+    conf.id = "default"
+    local reg = _M.create_registry(conf)
+    _M.start_registry(reg)
 end
 
 
 function _M.dump_data()
-    local keys = nacos_dict:get_keys(0)
+    local dict = get_dict()
+    if not dict then
+        return {services = {}}
+    end
+
+    local keys = dict:get_keys(0)
     local applications = {}
     for _, key in ipairs(keys) do
-        local value = nacos_dict:get(key)
+        local value = dict:get(key)
         if value then
             local nodes = core.json.decode(value)
             if nodes then
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 3ea057123..ad6f76d64 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -423,6 +423,7 @@ _EOC_
     lua_shared_dict kubernetes-stream 1m;
     lua_shared_dict kubernetes-first-stream 1m;
     lua_shared_dict kubernetes-second-stream 1m;
+    lua_shared_dict nacos-stream 10m;
     lua_shared_dict tars-stream 1m;
 
     upstream apisix_backend {
diff --git a/t/discovery/nacos2.t b/t/discovery/nacos2.t
index 044067ebd..5d57b288e 100644
--- a/t/discovery/nacos2.t
+++ b/t/discovery/nacos2.t
@@ -83,7 +83,7 @@ GET /hello
 --- response_body_like eval
 qr/server [1-2]/
 --- error_log
-error: status = 502
+err:status = 502
 
 
 
@@ -333,7 +333,7 @@ discovery:
 
             local body = json_decode(res.body)
             local services = body.services
-            local service = services["public.DEFAULT_GROUP.APISIX-NACOS"]
+            local service = 
services["default/public/DEFAULT_GROUP/APISIX-NACOS"]
             local number = table.getn(service.nodes)
             ngx.say(number)
         }


Reply via email to