This is an automated email from the ASF dual-hosted git repository.
nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 5962e73e7 refactor: extract reusable building blocks from K8s and
Nacos discovery (#13201)
5962e73e7 is described below
commit 5962e73e7cbef1387b061ba6892b8c98e258da45
Author: Nic <[email protected]>
AuthorDate: Tue Apr 14 12:29:35 2026 +0800
refactor: extract reusable building blocks from K8s and Nacos discovery
(#13201)
---
apisix/discovery/kubernetes/core.lua | 752 +++++++++++++++++++++++
apisix/discovery/kubernetes/informer_factory.lua | 34 +-
apisix/discovery/kubernetes/init.lua | 679 +-------------------
apisix/discovery/kubernetes/schema.lua | 6 -
apisix/discovery/nacos/client.lua | 365 +++++++++++
apisix/discovery/nacos/init.lua | 517 ++++++----------
t/APISIX.pm | 1 +
t/discovery/nacos2.t | 4 +-
8 files changed, 1381 insertions(+), 977 deletions(-)
diff --git a/apisix/discovery/kubernetes/core.lua
b/apisix/discovery/kubernetes/core.lua
new file mode 100644
index 000000000..1ba74408d
--- /dev/null
+++ b/apisix/discovery/kubernetes/core.lua
@@ -0,0 +1,752 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Reusable building blocks for Kubernetes service discovery.
+--- Extracted from init.lua so that both static-config mode and
+--- dynamic-config mode can share the same core logic.
+
+local ngx = ngx
+local ipairs = ipairs
+local pairs = pairs
+local unpack = unpack
+local string = string
+local tonumber = tonumber
+local tostring = tostring
+local os = os
+local pcall = pcall
+local setmetatable = setmetatable
+
+local core = require("apisix.core")
+local util = require("apisix.cli.util")
+local default_informer_factory =
require("apisix.discovery.kubernetes.informer_factory")
+
+
+local _M = {}
+
+local endpoint_buffer = {}
+local kubernetes_service_name_label = "kubernetes.io/service-name"
+
+
+-- ─── helpers ──────────────────────────────────────────────────────────
+
+local function sort_nodes_cmp(left, right)
+ if left.host ~= right.host then
+ return left.host < right.host
+ end
+ return left.port < right.port
+end
+
+
+local function build_endpoint_key(key_prefix, namespace, name)
+ if key_prefix and key_prefix ~= "" then
+ return key_prefix .. "/" .. namespace .. "/" .. name
+ end
+ return namespace .. "/" .. name
+end
+
+
+-- ─── config parsing (exported) ────────────────────────────────────────
+
+function _M.read_env(key)
+ if #key > 3 then
+ local first, second = string.byte(key, 1, 2)
+ if first == string.byte('$') and second == string.byte('{') then
+ local last = string.byte(key, #key)
+ if last == string.byte('}') then
+ local env = string.sub(key, 3, #key - 1)
+ local value = os.getenv(env)
+ if not value then
+ return nil, "not found environment variable " .. env
+ end
+ return value
+ end
+ end
+ end
+ return key
+end
+
+
+function _M.read_token(token_file)
+ local token, err = util.read_file(token_file)
+ if err then
+ return nil, err
+ end
+ return util.trim(token)
+end
+
+
+function _M.get_apiserver(conf)
+ local apiserver = {
+ schema = "",
+ host = "",
+ port = "",
+ }
+
+ apiserver.schema = conf.service.schema
+ if apiserver.schema ~= "http" and apiserver.schema ~= "https" then
+ return nil, "service.schema should set to one of [http,https] but " ..
apiserver.schema
+ end
+
+ local err
+ apiserver.host, err = _M.read_env(conf.service.host)
+ if err then
+ return nil, err
+ end
+ if apiserver.host == "" then
+ return nil, "service.host should set to non-empty string"
+ end
+
+ local port
+ port, err = _M.read_env(conf.service.port)
+ if err then
+ return nil, err
+ end
+ apiserver.port = tonumber(port)
+ if not apiserver.port or apiserver.port <= 0 or apiserver.port > 65535 then
+ return nil, "invalid port value: " .. (apiserver.port or "nil")
+ end
+
+ if conf.client.token then
+ local token
+ token, err = _M.read_env(conf.client.token)
+ if err then
+ return nil, err
+ end
+ apiserver.token = util.trim(token)
+ elseif conf.client.token_file and conf.client.token_file ~= "" then
+ setmetatable(apiserver, {
+ __index = function(_, key)
+ if key ~= "token" then
+ return
+ end
+ local token_file
+ token_file, err = _M.read_env(conf.client.token_file)
+ if err then
+ core.log.error("failed to read token file path: ", err)
+ return
+ end
+ local token
+ token, err = _M.read_token(token_file)
+ if err then
+ core.log.error("failed to read token from file: ", err)
+ return ""
+ end
+ core.log.debug("re-read the token value")
+ return token
+ end
+ })
+ else
+ return nil, "one of [client.token,client.token_file] should be set but
none"
+ end
+
+ if apiserver.schema == "https" and apiserver.token == "" then
+ return nil, "apiserver.token should set to non-empty string when
service.schema is https"
+ end
+
+ -- ssl_verify: use explicit config if set, otherwise default to false
+ if conf.service.ssl_verify ~= nil then
+ apiserver.ssl_verify = conf.service.ssl_verify
+ else
+ apiserver.ssl_verify = false
+ end
+
+ return apiserver
+end
+
+
+function _M.setup_namespace_selector(conf, informer)
+ local ns = conf.namespace_selector
+ if ns == nil then
+ informer.namespace_selector = nil
+ return
+ end
+
+ if ns.equal then
+ informer.field_selector = "metadata.namespace=" .. ns.equal
+ informer.namespace_selector = nil
+ return
+ end
+
+ if ns.not_equal then
+ informer.field_selector = "metadata.namespace!=" .. ns.not_equal
+ informer.namespace_selector = nil
+ return
+ end
+
+ if ns.match then
+ informer.namespace_selector = function(self, namespace)
+ local match = conf.namespace_selector.match
+ local m, err
+ for _, v in ipairs(match) do
+ m, err = ngx.re.match(namespace, v, "jo")
+ if m and m[0] == namespace then
+ return true
+ end
+ if err then
+ core.log.error("ngx.re.match failed: ", err)
+ end
+ end
+ return false
+ end
+ return
+ end
+
+ if ns.not_match then
+ informer.namespace_selector = function(self, namespace)
+ local not_match = conf.namespace_selector.not_match
+ local m, err
+ for _, v in ipairs(not_match) do
+ m, err = ngx.re.match(namespace, v, "jo")
+ if m and m[0] == namespace then
+ return false
+ end
+ if err then
+ return false
+ end
+ end
+ return true
+ end
+ return
+ end
+end
+
+
+function _M.setup_label_selector(conf, informer)
+ informer.label_selector = conf.label_selector
+end
+
+
+-- ─── endpoint dict operations (exported) ──────────────────────────────
+
+function _M.update_endpoint_dict(handle, endpoints, endpoint_key)
+ local endpoint_content = core.json.encode(endpoints, true)
+ local endpoint_version = ngx.crc32_long(endpoint_content)
+ local _, err
+ _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version",
endpoint_version)
+ if err then
+ return false, "set endpoint version into discovery DICT failed, " ..
err
+ end
+ _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
+ if err then
+ handle.endpoint_dict:delete(endpoint_key .. "#version")
+ return false, "set endpoint into discovery DICT failed, " .. err
+ end
+ return true
+end
+
+
+function _M.create_endpoint_lrucache(endpoint_dict, endpoint_key,
endpoint_port)
+ local endpoint_content = endpoint_dict:get(endpoint_key)
+ if not endpoint_content then
+ core.log.error("get empty endpoint content from discovery DIC, this
should not happen ",
+ endpoint_key)
+ return nil
+ end
+
+ local endpoint = core.json.decode(endpoint_content)
+ if not endpoint then
+ core.log.error("decode endpoint content failed, this should not
happen, content: ",
+ endpoint_content)
+ return nil
+ end
+
+ return endpoint[endpoint_port]
+end
+
+
+-- ─── endpoint callback factory ────────────────────────────────────────
+--- Create a set of informer callbacks parameterized by options.
+---
+--- options:
+--- key_prefix (string|nil) prefix for endpoint dict keys, used for
+--- multi-registry isolation on a shared dict.
+--- duplicate_port_number (bool|nil) when true, store nodes under the
numeric
+--- port key in addition to the port name key.
+---
+--- Returns a table: {
+--- on_endpoint_modified, on_endpoint_deleted,
+--- on_endpoint_slices_modified, on_endpoint_slices_deleted,
+--- pre_list, post_list
+--- }
+
+function _M.create_endpoint_callbacks(options)
+ options = options or {}
+ local key_prefix = options.key_prefix
+ local dup_port = options.duplicate_port_number
+
+ -- ── EndpointSlice helpers ──
+
+ local function update_endpoint_slices_cache(handle, endpoint_key, slice,
slice_name)
+ if not handle.endpoint_slices_cache[endpoint_key] then
+ handle.endpoint_slices_cache[endpoint_key] = {}
+ end
+ handle.endpoint_slices_cache[endpoint_key][slice_name] = slice
+ end
+
+ local function get_endpoints_from_cache(handle, endpoint_key)
+ local endpoint_slices = handle.endpoint_slices_cache[endpoint_key] or
{}
+ local endpoints = {}
+ for _, endpoint_slice in pairs(endpoint_slices) do
+ for port, targets in pairs(endpoint_slice) do
+ if not endpoints[port] then
+ endpoints[port] = core.table.new(0, #targets)
+ end
+ core.table.insert_tail(endpoints[port], unpack(targets))
+ end
+ end
+ return endpoints
+ end
+
+ local function validate_endpoint_slice(endpoint_slice)
+ if not endpoint_slice.metadata then
+ return false, "endpoint_slice has no metadata, endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.name then
+ return false, "endpoint_slice has no metadata.name, endpointSlice:
"
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.namespace then
+ return false, "endpoint_slice has no metadata.namespace,
endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ if not endpoint_slice.metadata.labels
+ or not
endpoint_slice.metadata.labels[kubernetes_service_name_label] then
+ return false, "endpoint_slice has no service-name, endpointSlice: "
+ .. core.json.encode(endpoint_slice)
+ end
+ return true
+ end
+
+ -- ── callbacks ──
+
+ local function on_endpoint_slices_modified(handle, endpoint_slice, operate)
+ local ok, err = validate_endpoint_slice(endpoint_slice)
+ if not ok then
+ core.log.error("endpoint_slice validation fail: ", err)
+ return
+ end
+ if handle.namespace_selector and
+ not
handle:namespace_selector(endpoint_slice.metadata.namespace) then
+ return
+ end
+
+ core.log.debug("get endpoint_slice: ",
core.json.delay_encode(endpoint_slice))
+ local port_to_nodes = {}
+
+ local slice_endpoints = endpoint_slice.endpoints
+ if not slice_endpoints or slice_endpoints == ngx.null then
+ slice_endpoints = {}
+ end
+
+ for _, ep in ipairs(slice_endpoints) do
+ if ep.addresses and ep.conditions and ep.conditions.ready then
+ local addresses = ep.addresses
+ for _, port in ipairs(endpoint_slice.ports or {}) do
+ local port_name
+ if port.name then
+ port_name = port.name
+ elseif port.targetPort then
+ port_name = tostring(port.targetPort)
+ else
+ port_name = tostring(port.port)
+ end
+
+ local nodes = port_to_nodes[port_name]
+ if nodes == nil then
+ nodes = core.table.new(0, #slice_endpoints *
#addresses)
+ port_to_nodes[port_name] = nodes
+ end
+
+ for _, ip in ipairs(addresses) do
+ core.table.insert(nodes, {
+ host = ip,
+ port = port.port,
+ weight = handle.default_weight
+ })
+ end
+
+ if dup_port and port.name then
+ port_to_nodes[tostring(port.port)] =
core.table.deepcopy(nodes)
+ end
+ end
+ end
+ end
+
+ local svc_name =
endpoint_slice.metadata.labels[kubernetes_service_name_label]
+ local endpoint_key = build_endpoint_key(
+ key_prefix, endpoint_slice.metadata.namespace, svc_name)
+ update_endpoint_slices_cache(
+ handle, endpoint_key, port_to_nodes, endpoint_slice.metadata.name)
+
+ local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
+ for _, nodes in pairs(cached_endpoints) do
+ core.table.sort(nodes, sort_nodes_cmp)
+ end
+
+ ok, err = _M.update_endpoint_dict(handle, cached_endpoints,
endpoint_key)
+ if not ok then
+ core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
+ ", err: ", err)
+ return
+ end
+ if operate == "list" then
+ handle.current_keys_hash[endpoint_key] = true
+ handle.current_keys_hash[endpoint_key .. "#version"] = true
+ end
+ end
+
+ local function on_endpoint_slices_deleted(handle, endpoint_slice)
+ local ok, err = validate_endpoint_slice(endpoint_slice)
+ if not ok then
+ core.log.error("endpoint_slice validation fail: ", err)
+ return
+ end
+ if handle.namespace_selector and
+ not
handle:namespace_selector(endpoint_slice.metadata.namespace) then
+ return
+ end
+
+ core.log.debug("delete endpoint_slice: ",
core.json.delay_encode(endpoint_slice))
+
+ local svc_name =
endpoint_slice.metadata.labels[kubernetes_service_name_label]
+ local endpoint_key = build_endpoint_key(
+ key_prefix, endpoint_slice.metadata.namespace, svc_name)
+ update_endpoint_slices_cache(handle, endpoint_key, nil,
endpoint_slice.metadata.name)
+
+ local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
+ for _, nodes in pairs(cached_endpoints) do
+ core.table.sort(nodes, sort_nodes_cmp)
+ end
+
+ ok, err = _M.update_endpoint_dict(handle, cached_endpoints,
endpoint_key)
+ if not ok then
+ core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
+ ", err: ", err)
+ end
+ end
+
+ local function on_endpoint_modified(handle, endpoint, operate)
+ if not endpoint or not endpoint.metadata
+ or not endpoint.metadata.namespace or not
endpoint.metadata.name then
+ core.log.warn("skipping endpoint with missing metadata: ",
+ core.json.delay_encode(endpoint))
+ return
+ end
+
+ if handle.namespace_selector and
+ not handle:namespace_selector(endpoint.metadata.namespace) then
+ return
+ end
+
+ core.log.debug(core.json.delay_encode(endpoint))
+ core.table.clear(endpoint_buffer)
+
+ local subsets = endpoint.subsets
+ for _, subset in ipairs(subsets or {}) do
+ if subset.addresses then
+ local addresses = subset.addresses
+ for _, port in ipairs(subset.ports or {}) do
+ local port_name
+ if port.name then
+ port_name = port.name
+ elseif port.targetPort then
+ port_name = tostring(port.targetPort)
+ else
+ port_name = tostring(port.port)
+ end
+
+ local nodes = endpoint_buffer[port_name]
+ if nodes == nil then
+ nodes = core.table.new(0, #subsets * #addresses)
+ endpoint_buffer[port_name] = nodes
+ end
+
+ for _, address in ipairs(subset.addresses) do
+ core.table.insert(nodes, {
+ host = address.ip,
+ port = port.port,
+ weight = handle.default_weight
+ })
+ end
+
+ if dup_port and port.name then
+ endpoint_buffer[tostring(port.port)] =
core.table.deepcopy(nodes)
+ end
+ end
+ end
+ end
+
+ for _, nodes in pairs(endpoint_buffer) do
+ core.table.sort(nodes, sort_nodes_cmp)
+ end
+
+ local endpoint_key = build_endpoint_key(
+ key_prefix, endpoint.metadata.namespace, endpoint.metadata.name)
+ local ok, err = _M.update_endpoint_dict(handle, endpoint_buffer,
endpoint_key)
+ if not ok then
+ core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
+ ", err: ", err)
+ return
+ end
+ if operate == "list" then
+ handle.current_keys_hash[endpoint_key] = true
+ handle.current_keys_hash[endpoint_key .. "#version"] = true
+ end
+ end
+
+ local function on_endpoint_deleted(handle, endpoint)
+ if not endpoint or not endpoint.metadata
+ or not endpoint.metadata.namespace or not
endpoint.metadata.name then
+ core.log.warn("skipping endpoint deletion with missing metadata: ",
+ core.json.delay_encode(endpoint))
+ return
+ end
+
+ if handle.namespace_selector and
+ not handle:namespace_selector(endpoint.metadata.namespace) then
+ return
+ end
+
+ core.log.debug(core.json.delay_encode(endpoint))
+ local endpoint_key = build_endpoint_key(
+ key_prefix, endpoint.metadata.namespace, endpoint.metadata.name)
+ handle.endpoint_dict:delete(endpoint_key .. "#version")
+ handle.endpoint_dict:delete(endpoint_key)
+ end
+
+ -- pre_list / post_list are prefix-aware: when key_prefix is set,
+ -- only keys belonging to this prefix are considered for dirty-data
cleanup.
+ local function pre_list(handle)
+ handle.current_keys_hash = {}
+ local all_keys = handle.endpoint_dict:get_keys(0)
+ if key_prefix and key_prefix ~= "" then
+ handle.existing_keys = {}
+ local prefix = key_prefix .. "/"
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix)
+ or key == "discovery_ready:" .. key_prefix then
+ core.table.insert(handle.existing_keys, key)
+ end
+ end
+ else
+ handle.existing_keys = all_keys
+ end
+ if handle.endpoint_slices_cache then
+ handle.endpoint_slices_cache = {}
+ end
+ end
+
+ local function post_list(handle)
+ if handle.existing_keys and handle.current_keys_hash then
+ for _, key in ipairs(handle.existing_keys) do
+ if not handle.current_keys_hash[key] then
+ core.log.info("kubernetes discovery module found dirty
data in shared dict, ",
+ "key: ", key)
+ handle.endpoint_dict:delete(key)
+ end
+ end
+ handle.existing_keys = nil
+ handle.current_keys_hash = nil
+ end
+ local ready_key = (key_prefix and key_prefix ~= "")
+ and ("discovery_ready:" .. key_prefix)
+ or "discovery_ready"
+ local _, err = handle.endpoint_dict:safe_set(ready_key, true)
+ if err then
+ core.log.error("set discovery_ready flag into discovery DICT
failed, ", err)
+ end
+ end
+
+ return {
+ on_endpoint_modified = on_endpoint_modified,
+ on_endpoint_deleted = on_endpoint_deleted,
+ on_endpoint_slices_modified = on_endpoint_slices_modified,
+ on_endpoint_slices_deleted = on_endpoint_slices_deleted,
+ pre_list = pre_list,
+ post_list = post_list,
+ }
+end
+
+
+-- ─── handle factory ───────────────────────────────────────────────────
+--- Create a fully configured Kubernetes discovery handle.
+---
+--- conf: standard kubernetes discovery config (service, client,
+--- namespace_selector, label_selector, default_weight,
+--- watch_endpoint_slices)
+---
+--- options:
+--- endpoint_dict (ngx.shared.DICT) required — the shared dict to
use
+--- key_prefix (string|nil) prefix for endpoint keys
+--- duplicate_port_number (bool|nil) store nodes under numeric port
too
+--- informer_factory (table|nil) custom informer factory module
+
+function _M.create_handle(conf, options)
+ local endpoint_dict = options.endpoint_dict
+ if not endpoint_dict then
+ return nil, "endpoint_dict is required"
+ end
+
+ local apiserver, err = _M.get_apiserver(conf)
+ if err then
+ return nil, err
+ end
+
+ local default_weight = conf.default_weight or 50
+
+ local inf_factory = options.informer_factory or default_informer_factory
+ local endpoints_informer
+ if conf.watch_endpoint_slices then
+ endpoints_informer, err = inf_factory.new(
+ "discovery.k8s.io", "v1", "EndpointSlice", "endpointslices", "")
+ else
+ endpoints_informer, err = inf_factory.new("", "v1", "Endpoints",
"endpoints", "")
+ end
+ if err then
+ return nil, err
+ end
+
+ _M.setup_namespace_selector(conf, endpoints_informer)
+ _M.setup_label_selector(conf, endpoints_informer)
+
+ local cbs = _M.create_endpoint_callbacks({
+ key_prefix = options.key_prefix,
+ duplicate_port_number = options.duplicate_port_number,
+ })
+
+ if conf.watch_endpoint_slices then
+ endpoints_informer.on_added = cbs.on_endpoint_slices_modified
+ endpoints_informer.on_modified = cbs.on_endpoint_slices_modified
+ endpoints_informer.on_deleted = cbs.on_endpoint_slices_deleted
+ endpoints_informer.endpoint_slices_cache = {}
+ else
+ endpoints_informer.on_added = cbs.on_endpoint_modified
+ endpoints_informer.on_modified = cbs.on_endpoint_modified
+ endpoints_informer.on_deleted = cbs.on_endpoint_deleted
+ end
+
+ endpoints_informer.pre_list = cbs.pre_list
+ endpoints_informer.post_list = cbs.post_list
+
+ local handle = setmetatable({
+ endpoint_dict = endpoint_dict,
+ apiserver = apiserver,
+ default_weight = default_weight,
+ }, { __index = endpoints_informer })
+
+ return handle
+end
+
+
+-- ─── lifecycle ────────────────────────────────────────────────────────
+
+function _M.start_fetch(handle)
+ local timer_runner
+ timer_runner = function(premature)
+ if premature then
+ return
+ end
+ if handle.stop then
+ core.log.info("stop fetching, kind: ", handle.kind)
+ return
+ end
+
+ local ok, status = pcall(handle.list_watch, handle, handle.apiserver)
+
+ local retry_interval = 0
+ if not ok then
+ core.log.error("list_watch failed, kind: ", handle.kind,
+ ", reason: ", "RuntimeException", ", message : ", status)
+ retry_interval = 40
+ elseif not status then
+ retry_interval = 40
+ end
+
+ if not handle.stop then
+ ngx.timer.at(retry_interval, timer_runner)
+ end
+ end
+ ngx.timer.at(0, timer_runner)
+end
+
+
+-- ─── node resolution ──────────────────────────────────────────────────
+--- Resolve service_name to upstream nodes from a shared dict with LRU cache.
+---
+--- endpoint_lrucache: core.lrucache instance
+--- service_name: the full service name (e.g. "ns/svc:port" or
"id/ns/svc:port")
+--- pattern: regex to parse service_name; must capture (endpoint_key_group,
port)
+--- where endpoint_key_group is used as-is for dict lookup
+--- dict_resolver: function(match) → endpoint_dict, endpoint_key, endpoint_port
+--- returns the dict, key, and port to look up
+
+function _M.resolve_nodes(endpoint_lrucache, service_name, pattern,
dict_resolver)
+ local match = ngx.re.match(service_name, pattern, "jo")
+ if not match then
+ core.log.error("get unexpected upstream service_name: ", service_name)
+ return nil
+ end
+
+ local endpoint_dict, endpoint_key, endpoint_port = dict_resolver(match)
+ if not endpoint_dict then
+ core.log.error("failed to resolve endpoint dict for service: ",
service_name)
+ return nil
+ end
+
+ local endpoint_version = endpoint_dict:get(endpoint_key .. "#version")
+ if not endpoint_version then
+ core.log.info("get empty endpoint version from discovery DICT ",
endpoint_key)
+ return nil
+ end
+
+ return endpoint_lrucache(service_name, endpoint_version,
+ _M.create_endpoint_lrucache, endpoint_dict, endpoint_key,
endpoint_port)
+end
+
+
+-- ─── dict helpers ─────────────────────────────────────────────────────
+
+function _M.dump_endpoints_from_dict(endpoint_dict)
+ local keys, err = endpoint_dict:get_keys(0)
+ if err then
+ core.log.error("get keys from discovery dict failed: ", err)
+ return
+ end
+
+ if not keys or #keys == 0 then
+ return
+ end
+
+ local endpoints = {}
+ for i = 1, #keys do
+ local key = keys[i]
+ if key:sub(-#"#version") ~= "#version"
+ and not core.string.has_prefix(key, "discovery_ready") then
+ local value = endpoint_dict:get(key)
+ core.table.insert(endpoints, {
+ name = key,
+ value = value
+ })
+ end
+ end
+
+ return endpoints
+end
+
+
+return _M
diff --git a/apisix/discovery/kubernetes/informer_factory.lua
b/apisix/discovery/kubernetes/informer_factory.lua
index 4d650d7bb..0daafef72 100644
--- a/apisix/discovery/kubernetes/informer_factory.lua
+++ b/apisix/discovery/kubernetes/informer_factory.lua
@@ -86,6 +86,9 @@ local function list(httpc, apiserver, informer)
informer.continue = data.metadata.continue
if informer.continue and informer.continue ~= "" then
+ if informer.stop then
+ return true
+ end
list(httpc, apiserver, informer)
end
@@ -199,6 +202,10 @@ end
local function watch(httpc, apiserver, informer)
local watch_times = 8
for _ = 1, watch_times do
+ if informer.stop then
+ return true
+ end
+
local watch_seconds = 1800 + math.random(9, 999)
informer.overtime = watch_seconds
local http_seconds = watch_seconds + 120
@@ -231,6 +238,10 @@ local function watch(httpc, apiserver, informer)
local reason
while true do
+ if informer.stop then
+ return true
+ end
+
body, err = response.body_reader()
if err then
return false, "ReadBodyError", err
@@ -266,15 +277,25 @@ local function list_watch(informer, apiserver)
informer.continue = ""
informer.version = ""
+ if informer.stop then
+ return true
+ end
+
informer.fetch_state = "connecting"
core.log.info("begin to connect ", apiserver.host, ":", apiserver.port)
- ok, message = httpc:connect({
+ local connect_opts = {
scheme = apiserver.schema,
host = apiserver.host,
port = apiserver.port,
- ssl_verify = apiserver.ssl_verify
- })
+ ssl_verify = apiserver.ssl_verify or false,
+ }
+
+ if apiserver.ssl_server_name then
+ connect_opts.ssl_server_name = apiserver.ssl_server_name
+ end
+
+ ok, message = httpc:connect(connect_opts)
if not ok then
informer.fetch_state = "connect failed"
@@ -298,10 +319,14 @@ local function list_watch(informer, apiserver)
end
informer.fetch_state = "list finished"
- if informer.post_list then
+ if informer.post_list and not informer.stop then
informer:post_list()
end
+ if informer.stop then
+ return true
+ end
+
core.log.info("begin to watch ", informer.kind)
informer.fetch_state = "watching"
ok, reason, message = watch(httpc, apiserver, informer)
@@ -370,6 +395,7 @@ function _M.new(group, version, kind, plural, namespace)
overtime = "1800",
version = "",
continue = "",
+ stop = false,
list_watch = list_watch
}
end
diff --git a/apisix/discovery/kubernetes/init.lua
b/apisix/discovery/kubernetes/init.lua
index a6cf828e2..f92077923 100644
--- a/apisix/discovery/kubernetes/init.lua
+++ b/apisix/discovery/kubernetes/init.lua
@@ -17,22 +17,15 @@
local ngx = ngx
local type = type
-local unpack = unpack
local ipairs = ipairs
local pairs = pairs
local string = string
-local tonumber = tonumber
-local tostring = tostring
-local os = os
local error = error
-local pcall = pcall
-local setmetatable = setmetatable
local is_http = ngx.config.subsystem == "http"
local process = require("ngx.process")
local core = require("apisix.core")
-local util = require("apisix.cli.util")
local local_conf = require("apisix.core.config_local").local_conf()
-local informer_factory =
require("apisix.discovery.kubernetes.informer_factory")
+local k8s_core = require("apisix.discovery.kubernetes.core")
local ctx
@@ -42,501 +35,12 @@ local endpoint_lrucache = core.lrucache.new({
count = 1024
})
-local endpoint_buffer = {}
-local kubernetes_service_name_label = "kubernetes.io/service-name"
-
-local function sort_nodes_cmp(left, right)
- if left.host ~= right.host then
- return left.host < right.host
- end
-
- return left.port < right.port
-end
-
-local function update_endpoint_slices_cache(handle, endpoint_key, slice,
slice_name)
- if not handle.endpoint_slices_cache[endpoint_key] then
- handle.endpoint_slices_cache[endpoint_key] = {}
- end
- local endpoint_slices = handle.endpoint_slices_cache[endpoint_key]
- endpoint_slices[slice_name] = slice
-end
-
-local function get_endpoints_from_cache(handle, endpoint_key)
- local endpoint_slices = handle.endpoint_slices_cache[endpoint_key] or {}
- local endpoints = {}
- for _, endpoint_slice in pairs(endpoint_slices) do
- for port, targets in pairs(endpoint_slice) do
- if not endpoints[port] then
- endpoints[port] = core.table.new(0, #targets)
- end
- core.table.insert_tail(endpoints[port], unpack(targets))
- end
- end
-
- return endpoints
-end
-
-local function update_endpoint_dict(handle, endpoints, endpoint_key)
- local endpoint_content = core.json.encode(endpoints, true)
- local endpoint_version = ngx.crc32_long(endpoint_content)
- local _, err
- _, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version",
endpoint_version)
- if err then
- return false, "set endpoint version into discovery DICT failed, " ..
err
- end
- _, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
- if err then
- handle.endpoint_dict:delete(endpoint_key .. "#version")
- return false, "set endpoint into discovery DICT failed, " .. err
- end
-
- return true
-end
-
-local function validate_endpoint_slice(endpoint_slice)
- if not endpoint_slice.metadata then
- return false, "endpoint_slice has no metadata, endpointSlice: "
- .. core.json.encode(endpoint_slice)
- end
- if not endpoint_slice.metadata.name then
- return false, "endpoint_slice has no metadata.name, endpointSlice: "
- .. core.json.encode(endpoint_slice)
- end
- if not endpoint_slice.metadata.namespace then
- return false, "endpoint_slice has no metadata.namespace,
endpointSlice: "
- .. core.json.encode(endpoint_slice)
- end
- if not endpoint_slice.metadata.labels
- or not
endpoint_slice.metadata.labels[kubernetes_service_name_label] then
- return false, "endpoint_slice has no service-name, endpointSlice: "
- .. core.json.encode(endpoint_slice)
- end
-
- return true
-end
-
-local function on_endpoint_slices_modified(handle, endpoint_slice, operate)
- local ok, err = validate_endpoint_slice(endpoint_slice)
- if not ok then
- core.log.error("endpoint_slice validation fail: ", err)
- return
- end
- if handle.namespace_selector and
- not handle:namespace_selector(endpoint_slice.metadata.namespace)
then
- return
- end
-
- core.log.debug("get endpoint_slice: ",
core.json.delay_encode(endpoint_slice))
- --record nodes to every port in service
- local port_to_nodes = {}
-
- local slice_endpoints = endpoint_slice.endpoints
- if not slice_endpoints or slice_endpoints == ngx.null then
- slice_endpoints = {}
- end
-
- for _, endpoint in ipairs(slice_endpoints) do
- if endpoint.addresses
- and endpoint.conditions
- and endpoint.conditions.ready then
- local addresses = endpoint.addresses
- for _, port in ipairs(endpoint_slice.ports or {}) do
- local port_name
- if port.name then
- port_name = port.name
- elseif port.targetPort then
- port_name = tostring(port.targetPort)
- else
- port_name = tostring(port.port)
- end
-
- local nodes = port_to_nodes[port_name]
- if nodes == nil then
- nodes = core.table.new(0, #slice_endpoints * #addresses)
- port_to_nodes[port_name] = nodes
- end
-
- for _, ip in ipairs(addresses) do
- core.table.insert(nodes, {
- host = ip,
- port = port.port,
- weight = handle.default_weight
- })
- end
- end
- end
- end
-
- local endpoint_key = endpoint_slice.metadata.namespace
- .. "/" ..
endpoint_slice.metadata.labels[kubernetes_service_name_label]
- update_endpoint_slices_cache(handle, endpoint_key, port_to_nodes,
endpoint_slice.metadata.name)
-
- local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
- for _, nodes in pairs(cached_endpoints) do
- core.table.sort(nodes, sort_nodes_cmp)
- end
-
- local ok, err = update_endpoint_dict(handle, cached_endpoints,
endpoint_key)
- if not ok then
- core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
- ", err: ", err)
- return
- end
- if operate == "list" then
- handle.current_keys_hash[endpoint_key] = true
- handle.current_keys_hash[endpoint_key .. "#version"] = true
- end
-end
-
-local function on_endpoint_slices_deleted(handle, endpoint_slice)
- local ok, err = validate_endpoint_slice(endpoint_slice)
- if not ok then
- core.log.error("endpoint_slice validation fail: ", err)
- return
- end
-
- if handle.namespace_selector and
- not handle:namespace_selector(endpoint_slice.metadata.namespace)
then
- return
- end
-
- core.log.debug("delete endpoint_slice: ",
core.json.delay_encode(endpoint_slice))
-
- local endpoint_key = endpoint_slice.metadata.namespace
- .. "/" ..
endpoint_slice.metadata.labels[kubernetes_service_name_label]
- update_endpoint_slices_cache(handle, endpoint_key, nil,
endpoint_slice.metadata.name)
-
- local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
- for _, nodes in pairs(cached_endpoints) do
- core.table.sort(nodes, sort_nodes_cmp)
- end
-
- ok, err = update_endpoint_dict(handle, cached_endpoints, endpoint_key)
- if not ok then
- core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
- ", err: ", err)
- end
-end
-
-local function on_endpoint_modified(handle, endpoint, operate)
- if handle.namespace_selector and
- not handle:namespace_selector(endpoint.metadata.namespace) then
- return
- end
-
- core.log.debug(core.json.delay_encode(endpoint))
- core.table.clear(endpoint_buffer)
-
- local subsets = endpoint.subsets
- for _, subset in ipairs(subsets or {}) do
- if subset.addresses then
- local addresses = subset.addresses
- for _, port in ipairs(subset.ports or {}) do
- local port_name
- if port.name then
- port_name = port.name
- elseif port.targetPort then
- port_name = tostring(port.targetPort)
- else
- port_name = tostring(port.port)
- end
-
- local nodes = endpoint_buffer[port_name]
- if nodes == nil then
- nodes = core.table.new(0, #subsets * #addresses)
- endpoint_buffer[port_name] = nodes
- end
-
- for _, address in ipairs(subset.addresses) do
- core.table.insert(nodes, {
- host = address.ip,
- port = port.port,
- weight = handle.default_weight
- })
- end
- end
- end
- end
-
-
- for _, nodes in pairs(endpoint_buffer) do
- core.table.sort(nodes, sort_nodes_cmp)
- end
-
- local endpoint_key = endpoint.metadata.namespace .. "/" ..
endpoint.metadata.name
- local ok, err = update_endpoint_dict(handle, endpoint_buffer, endpoint_key)
- if not ok then
- core.log.error("failed to update endpoint dict for endpoint: ",
endpoint_key,
- ", err: ", err)
- return
- end
- if operate == "list" then
- handle.current_keys_hash[endpoint_key] = true
- handle.current_keys_hash[endpoint_key .. "#version"] = true
- end
-end
-
-
-local function on_endpoint_deleted(handle, endpoint)
- if handle.namespace_selector and
- not handle:namespace_selector(endpoint.metadata.namespace) then
- return
- end
-
- core.log.debug(core.json.delay_encode(endpoint))
- local endpoint_key = endpoint.metadata.namespace .. "/" ..
endpoint.metadata.name
- handle.endpoint_dict:delete(endpoint_key .. "#version")
- handle.endpoint_dict:delete(endpoint_key)
-end
-
-
-local function pre_list(handle)
- handle.current_keys_hash = {}
- handle.existing_keys = handle.endpoint_dict:get_keys(0)
- if handle.endpoint_slices_cache then
- handle.endpoint_slices_cache = {}
- end
-end
-
-
-local function post_list(handle)
- if handle.existing_keys and handle.current_keys_hash then
- for _, key in ipairs(handle.existing_keys) do
- if not handle.current_keys_hash[key] then
- core.log.info("kubernetes discovery module found dirty data in
shared dict, key: ",
- key)
- handle.endpoint_dict:delete(key)
- end
- end
- handle.existing_keys = nil
- handle.current_keys_hash = nil
- end
- local _, err = handle.endpoint_dict:safe_set("discovery_ready", true)
- if err then
- core.log.error("set discovery_ready flag into discovery DICT failed,
", err)
- end
-end
-
-
-local function setup_label_selector(conf, informer)
- informer.label_selector = conf.label_selector
-end
-
-
-local function setup_namespace_selector(conf, informer)
- local ns = conf.namespace_selector
- if ns == nil then
- informer.namespace_selector = nil
- return
- end
-
- if ns.equal then
- informer.field_selector = "metadata.namespace=" .. ns.equal
- informer.namespace_selector = nil
- return
- end
-
- if ns.not_equal then
- informer.field_selector = "metadata.namespace!=" .. ns.not_equal
- informer.namespace_selector = nil
- return
- end
-
- if ns.match then
- informer.namespace_selector = function(self, namespace)
- local match = conf.namespace_selector.match
- local m, err
- for _, v in ipairs(match) do
- m, err = ngx.re.match(namespace, v, "jo")
- if m and m[0] == namespace then
- return true
- end
- if err then
- core.log.error("ngx.re.match failed: ", err)
- end
- end
- return false
- end
- return
- end
-
- if ns.not_match then
- informer.namespace_selector = function(self, namespace)
- local not_match = conf.namespace_selector.not_match
- local m, err
- for _, v in ipairs(not_match) do
- m, err = ngx.re.match(namespace, v, "jo")
- if m and m[0] == namespace then
- return false
- end
- if err then
- return false
- end
- end
- return true
- end
- return
- end
-
- return
-end
-
-
-local function read_env(key)
- if #key > 3 then
- local first, second = string.byte(key, 1, 2)
- if first == string.byte('$') and second == string.byte('{') then
- local last = string.byte(key, #key)
- if last == string.byte('}') then
- local env = string.sub(key, 3, #key - 1)
- local value = os.getenv(env)
- if not value then
- return nil, "not found environment variable " .. env
- end
- return value
- end
- end
- end
- return key
-end
-
-local function read_token(token_file)
- local token, err = util.read_file(token_file)
- if err then
- return nil, err
- end
-
- -- remove possible extra whitespace
- return util.trim(token)
-end
-
-local function get_apiserver(conf)
- local apiserver = {
- schema = "",
- host = "",
- port = "",
- }
-
- apiserver.schema = conf.service.schema
- if apiserver.schema ~= "http" and apiserver.schema ~= "https" then
- return nil, "service.schema should set to one of [http,https] but " ..
apiserver.schema
- end
-
- local err
- apiserver.host, err = read_env(conf.service.host)
- if err then
- return nil, err
- end
-
- if apiserver.host == "" then
- return nil, "service.host should set to non-empty string"
- end
-
- local port
- port, err = read_env(conf.service.port)
- if err then
- return nil, err
- end
-
- apiserver.port = tonumber(port)
- if not apiserver.port or apiserver.port <= 0 or apiserver.port > 65535 then
- return nil, "invalid port value: " .. apiserver.port
- end
-
- if conf.client.token then
- local token, err = read_env(conf.client.token)
- if err then
- return nil, err
- end
- apiserver.token = util.trim(token)
- elseif conf.client.token_file and conf.client.token_file ~= "" then
- setmetatable(apiserver, {
- __index = function(_, key)
- if key ~= "token" then
- return
- end
-
- local token_file, err = read_env(conf.client.token_file)
- if err then
- core.log.error("failed to read token file path: ", err)
- return
- end
-
- local token, err = read_token(token_file)
- if err then
- core.log.error("failed to read token from file: ", err)
- return
- end
- core.log.debug("re-read the token value")
- return token
- end
- })
- else
- return nil, "one of [client.token,client.token_file] should be set but
none"
- end
-
- if apiserver.schema == "https" and apiserver.token == "" then
- return nil, "apiserver.token should set to non-empty string when
service.schema is https"
- end
-
- -- ssl_verify: use explicit config if set, otherwise default to false
- if conf.service.ssl_verify ~= nil then
- apiserver.ssl_verify = conf.service.ssl_verify
- else
- apiserver.ssl_verify = false
- end
-
- return apiserver
-end
-
-local function create_endpoint_lrucache(endpoint_dict, endpoint_key,
endpoint_port)
- local endpoint_content = endpoint_dict:get(endpoint_key)
- if not endpoint_content then
- core.log.error("get empty endpoint content from discovery DIC, this
should not happen ",
- endpoint_key)
- return nil
- end
-
- local endpoint = core.json.decode(endpoint_content)
- if not endpoint then
- core.log.error("decode endpoint content failed, this should not
happen, content: ",
- endpoint_content)
- return nil
- end
-
- return endpoint[endpoint_port]
-end
-
local _M = {
version = "0.0.1"
}
-local function start_fetch(handle)
- local timer_runner
- timer_runner = function(premature)
- if premature then
- return
- end
-
- local ok, status = pcall(handle.list_watch, handle, handle.apiserver)
-
- local retry_interval = 0
- if not ok then
- core.log.error("list_watch failed, kind: ", handle.kind,
- ", reason: ", "RuntimeException", ", message : ", status)
- retry_interval = 40
- elseif not status then
- retry_interval = 40
- end
-
- ngx.timer.at(retry_interval, timer_runner)
- end
- ngx.timer.at(0, timer_runner)
-end
-
-
local function get_endpoint_dict_name(id)
local shm = "kubernetes"
@@ -570,71 +74,26 @@ local function single_mode_init(conf)
return
end
- local apiserver, err = get_apiserver(conf)
- if err then
- error(err)
- return
- end
-
- local default_weight = conf.default_weight
- local endpoints_informer, err
- if conf.watch_endpoint_slices then
- endpoints_informer, err = informer_factory.new("discovery.k8s.io",
"v1",
- "EndpointSlice",
"endpointslices", "")
- else
- endpoints_informer, err = informer_factory.new("", "v1", "Endpoints",
"endpoints", "")
- end
+ local handle, err = k8s_core.create_handle(conf, {
+ endpoint_dict = endpoint_dict,
+ })
if err then
error(err)
return
end
- setup_namespace_selector(conf, endpoints_informer)
- setup_label_selector(conf, endpoints_informer)
-
- if conf.watch_endpoint_slices then
- endpoints_informer.on_added = on_endpoint_slices_modified
- endpoints_informer.on_modified = on_endpoint_slices_modified
- endpoints_informer.on_deleted = on_endpoint_slices_deleted
- endpoints_informer.endpoint_slices_cache = {}
- else
- endpoints_informer.on_added = on_endpoint_modified
- endpoints_informer.on_modified = on_endpoint_modified
- endpoints_informer.on_deleted = on_endpoint_deleted
- end
-
- endpoints_informer.pre_list = pre_list
- endpoints_informer.post_list = post_list
-
- ctx = setmetatable({
- endpoint_dict = endpoint_dict,
- apiserver = apiserver,
- default_weight = default_weight
- }, { __index = endpoints_informer })
-
- start_fetch(ctx)
+ ctx = handle
+ k8s_core.start_fetch(ctx)
end
local function single_mode_nodes(service_name)
- local pattern = "^(.*):(.*)$" -- namespace/name:port_name
- local match = ngx.re.match(service_name, pattern, "jo")
- if not match then
- core.log.error("get unexpected upstream service_name: ", service_name)
- return nil
- end
-
- local endpoint_dict = ctx
- local endpoint_key = match[1]
- local endpoint_port = match[2]
- local endpoint_version = endpoint_dict:get(endpoint_key .. "#version")
- if not endpoint_version then
- core.log.info("get empty endpoint version from discovery DICT ",
endpoint_key)
- return nil
- end
-
- return endpoint_lrucache(service_name, endpoint_version,
- create_endpoint_lrucache, endpoint_dict, endpoint_key,
endpoint_port)
+ return k8s_core.resolve_nodes(
+ endpoint_lrucache, service_name,
+ "^(.*):(.*)$", -- namespace/name:port_name
+ function(match)
+ return ctx, match[1], match[2]
+ end)
end
@@ -678,81 +137,36 @@ local function multiple_mode_init(confs)
"please check your APISIX version")
end
- local apiserver, err = get_apiserver(conf)
- if err then
- error(err)
- return
- end
-
- local default_weight = conf.default_weight
-
- local endpoints_informer, err
- if conf.watch_endpoint_slices then
- endpoints_informer, err = informer_factory.new("discovery.k8s.io",
"v1",
- "EndpointSlice",
"endpointslices", "")
- else
- endpoints_informer, err = informer_factory.new("", "v1",
"Endpoints", "endpoints", "")
- end
+ local handle, err = k8s_core.create_handle(conf, {
+ endpoint_dict = endpoint_dict,
+ })
if err then
error(err)
return
end
- setup_namespace_selector(conf, endpoints_informer)
- setup_label_selector(conf, endpoints_informer)
-
- if conf.watch_endpoint_slices then
- endpoints_informer.on_added = on_endpoint_slices_modified
- endpoints_informer.on_modified = on_endpoint_slices_modified
- endpoints_informer.on_deleted = on_endpoint_slices_deleted
- endpoints_informer.endpoint_slices_cache = {}
- else
- endpoints_informer.on_added = on_endpoint_modified
- endpoints_informer.on_modified = on_endpoint_modified
- endpoints_informer.on_deleted = on_endpoint_deleted
- end
-
- endpoints_informer.pre_list = pre_list
- endpoints_informer.post_list = post_list
-
- ctx[id] = setmetatable({
- endpoint_dict = endpoint_dict,
- apiserver = apiserver,
- default_weight = default_weight
- }, { __index = endpoints_informer })
+ ctx[id] = handle
end
for _, item in pairs(ctx) do
- start_fetch(item)
+ k8s_core.start_fetch(item)
end
end
local function multiple_mode_nodes(service_name)
- local pattern = "^(.*)/(.*/.*):(.*)$" -- id/namespace/name:port_name
- local match = ngx.re.match(service_name, pattern, "jo")
- if not match then
- core.log.error("get unexpected upstream service_name: ", service_name)
- return nil
- end
-
- local id = match[1]
- local endpoint_dict = ctx[id]
- if not endpoint_dict then
- core.log.error("id not exist")
- return nil
- end
-
- local endpoint_key = match[2]
- local endpoint_port = match[3]
- local endpoint_version = endpoint_dict:get(endpoint_key .. "#version")
- if not endpoint_version then
- core.log.info("get empty endpoint version from discovery DICT ",
endpoint_key)
- return nil
- end
-
- return endpoint_lrucache(service_name, endpoint_version,
- create_endpoint_lrucache, endpoint_dict, endpoint_key,
endpoint_port)
+ return k8s_core.resolve_nodes(
+ endpoint_lrucache, service_name,
+ "^(.*)/(.*/.*):(.*)$", -- id/namespace/name:port_name
+ function(match)
+ local id = match[1]
+ local endpoint_dict = ctx[id]
+ if not endpoint_dict then
+ core.log.error("id not exist")
+ return nil
+ end
+ return endpoint_dict, match[2], match[3]
+ end)
end
@@ -769,52 +183,22 @@ function _M.init_worker()
end
-local function dump_endpoints_from_dict(endpoint_dict)
- local keys, err = endpoint_dict:get_keys(0)
- if err then
- core.log.error("get keys from discovery dict failed: ", err)
- return
- end
-
- if not keys or #keys == 0 then
- return
- end
-
- local endpoints = {}
- for i = 1, #keys do
- local key = keys[i]
- -- skip key with suffix #version
- if key:sub(-#"#version") ~= "#version" then
- local value = endpoint_dict:get(key)
- core.table.insert(endpoints, {
- name = key,
- value = value
- })
- end
- end
-
- return endpoints
-end
-
-
function _M.dump_data()
local discovery_conf = local_conf.discovery.kubernetes
local eps = {}
if #discovery_conf == 0 then
- -- Single mode: discovery_conf is a single configuration object
local endpoint_dict = get_endpoint_dict()
- local endpoints = dump_endpoints_from_dict(endpoint_dict)
+ local endpoints = k8s_core.dump_endpoints_from_dict(endpoint_dict)
if endpoints then
core.table.insert(eps, {
endpoints = endpoints
})
end
else
- -- Multiple mode: discovery_conf is an array of configuration objects
for _, conf in ipairs(discovery_conf) do
local endpoint_dict = get_endpoint_dict(conf.id)
- local endpoints = dump_endpoints_from_dict(endpoint_dict)
+ local endpoints = k8s_core.dump_endpoints_from_dict(endpoint_dict)
if endpoints then
core.table.insert(eps, {
id = conf.id,
@@ -836,7 +220,6 @@ local function check_ready(id)
return false, "failed to get lua_shared_dict: " ..
get_endpoint_dict_name(id)
.. ", please check your APISIX version"
end
- -- check flag
local ready = endpoint_dict:get("discovery_ready")
if not ready then
core.log.warn("kubernetes discovery not ready")
diff --git a/apisix/discovery/kubernetes/schema.lua
b/apisix/discovery/kubernetes/schema.lua
index b3f35ae72..0aa39c6bd 100644
--- a/apisix/discovery/kubernetes/schema.lua
+++ b/apisix/discovery/kubernetes/schema.lua
@@ -131,9 +131,6 @@ return {
},
ssl_verify = {
type = "boolean",
- description = "Verify the TLS certificate of the
Kubernetes API " ..
- "server. Defaults to false. Set to
true to enable " ..
- "certificate verification.",
},
},
default = {
@@ -198,9 +195,6 @@ return {
},
ssl_verify = {
type = "boolean",
- description = "Verify the TLS certificate of
the Kubernetes " ..
- "API server. Defaults to false.
Set to true to " ..
- "enable certificate
verification.",
},
},
required = { "host", "port" }
diff --git a/apisix/discovery/nacos/client.lua
b/apisix/discovery/nacos/client.lua
new file mode 100644
index 000000000..3f674d3a1
--- /dev/null
+++ b/apisix/discovery/nacos/client.lua
@@ -0,0 +1,365 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Reusable HTTP client primitives for Nacos service discovery.
+--- Extracted from init.lua so that both static-config mode and
+--- dynamic-config mode can share the same core logic.
+
+local require = require
+local http = require('resty.http')
+local core = require('apisix.core')
+local ipairs = ipairs
+local type = type
+local ngx = ngx
+local string = string
+local string_sub = string.sub
+local str_byte = string.byte
+local str_find = core.string.find
+local log = core.log
+
+local auth_path = 'auth/login'
+local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
+local default_namespace_id = "public"
+local default_group_name = "DEFAULT_GROUP"
+
+
+local _M = {}
+
+
+-- ─── HTTP primitives ──────────────────────────────────────────────────
+
+function _M.request(request_uri, path, body, method, basic_auth, timeout)
+ local url = request_uri .. path
+ log.info('request url:', request_uri, path)
+ local headers = {}
+ headers['Accept'] = 'application/json'
+
+ if basic_auth then
+ headers['Authorization'] = basic_auth
+ end
+
+ if body and 'table' == type(body) then
+ local err
+ body, err = core.json.encode(body)
+ if not body then
+ return nil, 'invalid body : ' .. err
+ end
+ headers['Content-Type'] = 'application/json'
+ end
+
+ local httpc = http.new()
+ timeout = timeout or {}
+ local connect_timeout = timeout.connect or 2000
+ local send_timeout = timeout.send or 5000
+ local read_timeout = timeout.read or 5000
+ httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
+ local res, err = httpc:request_uri(url, {
+ method = method,
+ headers = headers,
+ body = body,
+ ssl_verify = true,
+ })
+ if not res then
+ return nil, err
+ end
+
+ if not res.body or res.status ~= 200 then
+ return nil, 'status = ' .. res.status
+ end
+
+ local json_str = res.body
+ local data, decode_err = core.json.decode(json_str)
+ if not data then
+ return nil, decode_err
+ end
+ return data
+end
+
+
+-- ─── authentication ───────────────────────────────────────────────────
+
+function _M.get_token_param(base_uri, username, password, timeout)
+ if not username or not password then
+ return ''
+ end
+
+ local args = { username = username, password = password }
+ local data, err = _M.request(base_uri, auth_path .. '?' ..
ngx.encode_args(args),
+ nil, 'POST', nil, timeout)
+ if err then
+ log.error('nacos login fail:', username, ' desc:', err)
+ return nil, err
+ end
+ if type(data) ~= "table" or not data.accessToken or data.accessToken == ""
then
+ return nil, 'nacos login response missing accessToken'
+ end
+ return '&accessToken=' .. data.accessToken
+end
+
+
+function _M.get_signed_param(group_name, service_name, access_key, secret_key)
+ local param = ''
+ if access_key and access_key ~= '' and secret_key and secret_key ~= '' then
+ local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' ..
service_name
+ local args = {
+ ak = access_key,
+ data = str_to_sign,
+ signature = ngx.encode_base64(ngx.hmac_sha1(secret_key,
str_to_sign))
+ }
+ param = '&' .. ngx.encode_args(args)
+ end
+ return param
+end
+
+
+-- ─── URL building ─────────────────────────────────────────────────────
+
+function _M.build_base_uri(url, prefix)
+ local auth_idx = core.string.rfind_char(url, '@')
+ local username, password
+ if auth_idx then
+ local protocol_idx = str_find(url, '://')
+ if not protocol_idx or protocol_idx >= auth_idx then
+ return nil
+ end
+ local protocol = string_sub(url, 1, protocol_idx + 2)
+ local user_and_password = string_sub(url, protocol_idx + 3, auth_idx -
1)
+ local colon_idx = str_find(user_and_password, ':')
+ if colon_idx then
+ username = string_sub(user_and_password, 1, colon_idx - 1)
+ password = string_sub(user_and_password, colon_idx + 1)
+ end
+ local other = string_sub(url, auth_idx + 1)
+ url = protocol .. other
+ end
+
+ if prefix then
+ url = url .. prefix
+ end
+
+ if str_byte(url, #url) ~= str_byte('/') then
+ url = url .. '/'
+ end
+
+ return url, username, password
+end
+
+
+-- ─── query param helpers ──────────────────────────────────────────────
+
+local function get_namespace_param(namespace_id)
+ local param = ''
+ if namespace_id then
+ local args = { namespaceId = namespace_id }
+ param = '&' .. ngx.encode_args(args)
+ end
+ return param
+end
+
+
+local function get_group_name_param(group_name)
+ local param = ''
+ if group_name then
+ local args = { groupName = group_name }
+ param = '&' .. ngx.encode_args(args)
+ end
+ return param
+end
+
+
+local function is_grpc(scheme)
+ return scheme == 'grpc' or scheme == 'grpcs'
+end
+
+
+-- ─── instance fetching ────────────────────────────────────────────────
+
+--- Fetch instances from a single nacos host for a list of services.
+---
+--- Returns: nodes_cache (table of key → nodes), service_names (set of key →
true)
+--- On failure: nil, nil, err_string
+---
+--- options:
+--- default_weight (number) default node weight
+--- access_key (string) AK for HMAC-SHA1 signing (optional)
+--- secret_key (string) SK for HMAC-SHA1 signing (optional)
+--- timeout (table) { connect, send, read } in ms
+--- preserve_metadata (bool) include instance.metadata in returned
nodes
+--- key_builder (function) key_builder(namespace_id, group_name,
service_name)
+--- returns the key to use for this service
in the result.
+--- default: ns_id.group.service
+
+function _M.fetch_from_host(base_uri, username, password, services, options)
+ options = options or {}
+ local dw = options.default_weight or 100
+ local ak = options.access_key
+ local sk = options.secret_key
+ local timeout = options.timeout
+ local preserve_metadata = options.preserve_metadata
+ local key_builder = options.key_builder
+
+ local token_param, err = _M.get_token_param(base_uri, username, password,
timeout)
+ if err then
+ return nil, nil, err
+ end
+
+ local service_names = {}
+ local nodes_cache = {}
+ local had_success = false
+
+ for _, service_info in ipairs(services) do
+ local namespace_id = service_info.namespace_id
+ local group_name = service_info.group_name
+ local scheme = service_info.scheme or ''
+ local namespace_param = get_namespace_param(namespace_id)
+ local group_name_param = get_group_name_param(group_name)
+ local signature_param = _M.get_signed_param(
+ group_name, service_info.service_name, ak, sk)
+ local query_path = instance_list_path .. service_info.service_name
+ .. token_param .. namespace_param ..
group_name_param
+ .. signature_param
+ local data, req_err = _M.request(base_uri, query_path, nil, 'GET',
nil, timeout)
+ if req_err then
+ log.error('get_url:', query_path, ' err:', req_err)
+ else
+ had_success = true
+
+ local key
+ if key_builder then
+ key = key_builder(namespace_id, group_name,
service_info.service_name)
+ else
+ key = namespace_id .. '.' .. group_name .. '.' ..
service_info.service_name
+ end
+ service_names[key] = true
+
+ local hosts = data.hosts
+ if type(hosts) ~= 'table' then
+ hosts = {}
+ end
+
+ local nodes = {}
+ for _, host in ipairs(hosts) do
+ local node = {
+ host = host.ip,
+ port = host.port,
+ weight = host.weight or dw,
+ }
+ if is_grpc(scheme) and host.metadata and
host.metadata.gRPC_port then
+ node.port = host.metadata.gRPC_port
+ end
+ if preserve_metadata and host.metadata then
+ node.metadata = host.metadata
+ end
+ core.table.insert(nodes, node)
+ end
+
+ nodes_cache[key] = nodes
+ end
+ end
+
+ if not had_success then
+ return nil, nil, 'all nacos services fetch failed'
+ end
+
+ return nodes_cache, service_names
+end
+
+
+-- ─── service scanning ─────────────────────────────────────────────────
+
+local function de_duplication(services, namespace_id, group_name,
service_name, scheme)
+ for _, service in ipairs(services) do
+ if service.namespace_id == namespace_id and service.group_name ==
group_name
+ and service.service_name == service_name and service.scheme ==
scheme then
+ return true
+ end
+ end
+ return false
+end
+
+
+local function iter_and_add_service(services, values, filter)
+ if not values then
+ return
+ end
+
+ for _, value in core.config_util.iterate_values(values) do
+ local conf = value.value
+ if not conf then
+ goto CONTINUE
+ end
+
+ local up
+ if conf.upstream then
+ up = conf.upstream
+ else
+ up = conf
+ end
+
+ if up.discovery_type ~= 'nacos' then
+ goto CONTINUE
+ end
+
+ if filter and not filter(up) then
+ goto CONTINUE
+ end
+
+ local namespace_id = (up.discovery_args and
up.discovery_args.namespace_id)
+ or default_namespace_id
+ local group_name = (up.discovery_args and up.discovery_args.group_name)
+ or default_group_name
+
+ local dup = de_duplication(services, namespace_id, group_name,
+ up.service_name, up.scheme)
+ if dup then
+ goto CONTINUE
+ end
+
+ core.table.insert(services, {
+ service_name = up.service_name,
+ namespace_id = namespace_id,
+ group_name = group_name,
+ scheme = up.scheme,
+ })
+ ::CONTINUE::
+ end
+end
+
+
+--- Scan APISIX routes/services/upstreams for nacos discovery references.
+--- filter: optional function(upstream) → bool, called on each upstream config.
+function _M.get_nacos_services(filter)
+ local services = {}
+
+ -- lazy load to work around circular dependency
+ local get_upstreams = require('apisix.upstream').upstreams
+ local get_routes = require('apisix.router').http_routes
+ local get_stream_routes = require('apisix.router').stream_routes
+ local get_services = require('apisix.http.service').services
+ local values = get_upstreams()
+ iter_and_add_service(services, values, filter)
+ values = get_routes()
+ iter_and_add_service(services, values, filter)
+ values = get_services()
+ iter_and_add_service(services, values, filter)
+ values = get_stream_routes()
+ iter_and_add_service(services, values, filter)
+ return services
+end
+
+
+return _M
diff --git a/apisix/discovery/nacos/init.lua b/apisix/discovery/nacos/init.lua
index 74875a6a9..1c5087061 100644
--- a/apisix/discovery/nacos/init.lua
+++ b/apisix/discovery/nacos/init.lua
@@ -17,418 +17,301 @@
local require = require
local local_conf = require('apisix.core.config_local').local_conf()
-local http = require('resty.http')
local core = require('apisix.core')
+local nacos_client = require('apisix.discovery.nacos.client')
+local is_http = ngx.config.subsystem == "http"
local ipairs = ipairs
local pairs = pairs
-local type = type
+local error = error
local math_random = math.random
local ngx = ngx
-local ngx_re = require('ngx.re')
local ngx_timer_at = ngx.timer.at
-local ngx_timer_every = ngx.timer.every
-local string = string
-local string_sub = string.sub
-local str_byte = string.byte
-local str_find = core.string.find
local log = core.log
-local default_weight
-local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name
-if not nacos_dict then
- error("lua_shared_dict \"nacos\" not configured")
-end
-
-local auth_path = 'auth/login'
-local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
-local default_namespace_id = "public"
-local default_group_name = "DEFAULT_GROUP"
-local access_key
-local secret_key
-
-
local _M = {}
-local function get_key(namespace_id, group_name, service_name)
- return namespace_id .. '.' .. group_name .. '.' .. service_name
-end
-
-local function request(request_uri, path, body, method, basic_auth)
- local url = request_uri .. path
- log.info('request url:', url)
- local headers = {}
- headers['Accept'] = 'application/json'
+local nacos_dict
+local registries = {}
- if basic_auth then
- headers['Authorization'] = basic_auth
- end
+local dict_name = is_http and "nacos" or "nacos-stream"
- if body and 'table' == type(body) then
- local err
- body, err = core.json.encode(body)
- if not body then
- return nil, 'invalid body : ' .. err
- end
- headers['Content-Type'] = 'application/json'
+local function get_dict()
+ if not nacos_dict then
+ nacos_dict = ngx.shared[dict_name]
end
-
- local httpc = http.new()
- local timeout = local_conf.discovery.nacos.timeout
- local connect_timeout = timeout.connect
- local send_timeout = timeout.send
- local read_timeout = timeout.read
- log.info('connect_timeout:', connect_timeout, ', send_timeout:',
send_timeout,
- ', read_timeout:', read_timeout)
- httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
- local res, err = httpc:request_uri(url, {
- method = method,
- headers = headers,
- body = body,
- ssl_verify = true,
- })
- if not res then
- return nil, err
- end
-
- if not res.body or res.status ~= 200 then
- return nil, 'status = ' .. res.status
- end
-
- local json_str = res.body
- local data, err = core.json.decode(json_str)
- if not data then
- return nil, err
- end
- return data
+ return nacos_dict
end
-local function get_url(request_uri, path)
- return request(request_uri, path, nil, 'GET', nil)
+local function default_key_builder(id)
+ return function(namespace_id, group_name, service_name)
+ return id .. "/" .. namespace_id .. "/" .. group_name .. "/" ..
service_name
+ end
end
-local function post_url(request_uri, path, body)
- return request(request_uri, path, body, 'POST', nil)
-end
-
+local function fetch_full_registry(premature, reg)
+ if premature or reg.stop_flag then
+ return
+ end
-local function get_token_param(base_uri, username, password)
- if not username or not password then
- return ''
+ local dict = get_dict()
+ if not dict then
+ log.error("nacos shared dict not available")
+ return
end
- local args = { username = username, password = password}
- local data, err = post_url(base_uri, auth_path .. '?' ..
ngx.encode_args(args), nil)
- if err then
- log.error('nacos login fail:', username, ' ', password, ' desc:', err)
- return nil, err
+ local services = reg.service_scanner()
+ if reg.stop_flag then
+ return
end
- return '&accessToken=' .. data.accessToken
-end
+ local prefix = reg.id .. "/"
-local function get_namespace_param(namespace_id)
- local param = ''
- if namespace_id then
- local args = {namespaceId = namespace_id}
- param = '&' .. ngx.encode_args(args)
+ if #services == 0 then
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
+ end
+ if not reg.stop_flag then
+ ngx_timer_at(reg.conf.fetch_interval or 30, fetch_full_registry,
reg)
+ end
+ return
end
- return param
-end
+ local hosts = reg.conf.host
+ local host_count = #hosts
+ local start = math_random(host_count)
+ local timeout = reg.conf.timeout
-local function get_group_name_param(group_name)
- local param = ''
- if group_name then
- local args = {groupName = group_name}
- param = '&' .. ngx.encode_args(args)
- end
- return param
-end
+ for i = 0, host_count - 1 do
+ if reg.stop_flag then
+ return
+ end
+ local idx = (start + i - 1) % host_count + 1
+ local base_uri, username, password = nacos_client.build_base_uri(
+ hosts[idx], reg.conf.prefix)
-local function get_signed_param(group_name, service_name)
- local param = ''
- if access_key ~= '' and secret_key ~= '' then
- local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' ..
service_name
- local args = {
- ak = access_key,
- data = str_to_sign,
- signature = ngx.encode_base64(ngx.hmac_sha1(secret_key,
str_to_sign))
- }
- param = '&' .. ngx.encode_args(args)
- end
- return param
-end
+ if not base_uri then
+ log.warn("nacos host at index ", idx, " is invalid, skip")
+ else
+ local nodes_cache, service_names, err =
nacos_client.fetch_from_host(
+ base_uri,
+ username or reg.username,
+ password or reg.password,
+ services, {
+ default_weight = reg.conf.weight,
+ access_key = reg.conf.access_key,
+ secret_key = reg.conf.secret_key,
+ timeout = timeout,
+ preserve_metadata = reg.preserve_metadata,
+ key_builder = reg.key_builder,
+ })
+
+ if nodes_cache then
+ if reg.stop_flag then
+ return
+ end
+ for key, nodes in pairs(nodes_cache) do
+ dict:set(key, core.json.encode(nodes))
+ end
-local function build_base_uri(url)
- local auth_idx = core.string.rfind_char(url, '@')
- local username, password
- if auth_idx then
- local protocol_idx = str_find(url, '://')
- local protocol = string_sub(url, 1, protocol_idx + 2)
- local user_and_password = string_sub(url, protocol_idx + 3, auth_idx -
1)
- local arr = ngx_re.split(user_and_password, ':')
- if #arr == 2 then
- username = arr[1]
- password = arr[2]
- end
- local other = string_sub(url, auth_idx + 1)
- url = protocol .. other
- end
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix)
+ and not service_names[key] then
+ dict:delete(key)
+ end
+ end
- if local_conf.discovery.nacos.prefix then
- url = url .. local_conf.discovery.nacos.prefix
+ log.info("nacos registry updated, id: ", reg.id,
+ ", services: ", #services)
+ if not reg.stop_flag then
+ ngx_timer_at(reg.conf.fetch_interval or 30,
+ fetch_full_registry, reg)
+ end
+ return
+ end
+ log.error("fetch_from_host: ", base_uri, " err:", err)
+ end
end
- if str_byte(url, #url) ~= str_byte('/') then
- url = url .. '/'
+ log.error("failed to fetch nacos registry from all hosts, id: ", reg.id)
+ if not reg.stop_flag then
+ ngx_timer_at(reg.conf.fetch_interval or 30, fetch_full_registry, reg)
end
-
- return url, username, password
end
-local function get_base_uri_by_index(index)
- local host = local_conf.discovery.nacos.host
+-- ─── Registry management API ──────────────────────────────────────────
+
+--- Create a nacos registry instance.
+---
+--- conf fields: id, host (array), fetch_interval, prefix, weight,
+--- access_key, secret_key, timeout ({connect,send,read} in ms)
+---
+--- options: service_scanner (function), preserve_metadata (bool),
+--- key_builder (function(ns,group,svc)->string),
+--- username (string), password (string)
+function _M.create_registry(conf, options)
+ options = options or {}
+ local id = conf.id
+ if not id or id == "" then
+ return nil, "registry id is required"
+ end
- local url = host[index]
- if not url then
- return nil
+ if registries[id] then
+ _M.stop_registry(id)
end
- return build_base_uri(url)
+ local reg = {
+ id = id,
+ conf = conf,
+ stop_flag = false,
+ preserve_metadata = options.preserve_metadata or false,
+ key_builder = options.key_builder or default_key_builder(id),
+ service_scanner = options.service_scanner or function()
+ return nacos_client.get_nacos_services()
+ end,
+ username = options.username,
+ password = options.password,
+ }
+
+ registries[id] = reg
+ return reg
end
-local function de_duplication(services, namespace_id, group_name,
service_name, scheme)
- for _, service in ipairs(services) do
- if service.namespace_id == namespace_id and service.group_name ==
group_name
- and service.service_name == service_name and service.scheme ==
scheme then
- return true
- end
- end
- return false
+function _M.start_registry(reg)
+ ngx_timer_at(0, fetch_full_registry, reg)
end
-local function iter_and_add_service(services, values)
- if not values then
+function _M.stop_registry(id)
+ local reg = registries[id]
+ if not reg then
return
end
- for _, value in core.config_util.iterate_values(values) do
- local conf = value.value
- if not conf then
- goto CONTINUE
- end
-
- local up
- if conf.upstream then
- up = conf.upstream
- else
- up = conf
- end
-
- local namespace_id = (up.discovery_args and
up.discovery_args.namespace_id)
- or default_namespace_id
-
- local group_name = (up.discovery_args and up.discovery_args.group_name)
- or default_group_name
+ reg.stop_flag = true
+ registries[id] = nil
- local dup = de_duplication(services, namespace_id, group_name,
- up.service_name, up.scheme)
- if dup then
- goto CONTINUE
- end
-
- if up.discovery_type == 'nacos' then
- core.table.insert(services, {
- service_name = up.service_name,
- namespace_id = namespace_id,
- group_name = group_name,
- scheme = up.scheme,
- })
+ local dict = get_dict()
+ if dict then
+ local prefix = id .. "/"
+ local all_keys = dict:get_keys(0)
+ for _, key in ipairs(all_keys) do
+ if core.string.has_prefix(key, prefix) then
+ dict:delete(key)
+ end
end
- ::CONTINUE::
end
end
-local function get_nacos_services()
- local services = {}
-
- -- here we use lazy load to work around circle dependency
- local get_upstreams = require('apisix.upstream').upstreams
- local get_routes = require('apisix.router').http_routes
- local get_stream_routes = require('apisix.router').stream_routes
- local get_services = require('apisix.http.service').services
- local values = get_upstreams()
- iter_and_add_service(services, values)
- values = get_routes()
- iter_and_add_service(services, values)
- values = get_services()
- iter_and_add_service(services, values)
- values = get_stream_routes()
- iter_and_add_service(services, values)
- return services
-end
-
-local function is_grpc(scheme)
- if scheme == 'grpc' or scheme == 'grpcs' then
- return true
- end
-
- return false
+function _M.get_registry(id)
+ return registries[id]
end
-local curr_service_in_use = {}
-
-
-local function fetch_from_host(base_uri, username, password, services)
- local token_param, err = get_token_param(base_uri, username, password)
- if err then
- return false, err
- end
-
- local service_names = {}
- local nodes_cache = {}
- local had_success = false
-
- for _, service_info in ipairs(services) do
- local namespace_id = service_info.namespace_id
- local group_name = service_info.group_name
- local scheme = service_info.scheme or ''
- local namespace_param = get_namespace_param(namespace_id)
- local group_name_param = get_group_name_param(group_name)
- local signature_param = get_signed_param(group_name,
service_info.service_name)
- local query_path = instance_list_path .. service_info.service_name
- .. token_param .. namespace_param ..
group_name_param
- .. signature_param
- local data, req_err = get_url(base_uri, query_path)
- if req_err then
- log.error('failed to fetch instances for service [',
service_info.service_name,
- '] from ', base_uri, ', error: ', req_err)
- else
- had_success = true
-
- local key = get_key(namespace_id, group_name,
service_info.service_name)
- service_names[key] = true
- local hosts = data.hosts
- if type(hosts) ~= 'table' then
- hosts = {}
- end
-
- local nodes = {}
- for _, host in ipairs(hosts) do
- local node = {
- host = host.ip,
- port = host.port,
- weight = host.weight or default_weight,
- }
- -- docs:
https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
- if is_grpc(scheme) and host.metadata and
host.metadata.gRPC_port then
- node.port = host.metadata.gRPC_port
- end
-
- core.table.insert(nodes, node)
- end
-
- if #nodes > 0 then
- nodes_cache[key] = nodes
- end
- end
- end
+-- ─── Shared helpers ──────────────────────────────────────────────────
- if not had_success then
- return false, 'all nacos services fetch failed'
+local function match_metadata(node_metadata, upstream_metadata)
+ if upstream_metadata == nil then
+ return true
end
- for key, nodes in pairs(nodes_cache) do
- local content = core.json.encode(nodes)
- nacos_dict:set(key, content)
+ if not node_metadata then
+ node_metadata = {}
end
- for key, _ in pairs(curr_service_in_use) do
- if not service_names[key] then
- nacos_dict:delete(key)
+ for k, v in pairs(upstream_metadata) do
+ if not node_metadata[k] or node_metadata[k] ~= v then
+ return false
end
end
- curr_service_in_use = service_names
return true
end
-local function fetch_full_registry(premature)
- if premature then
- return
+function _M.get_nodes(key, metadata)
+ local dict = get_dict()
+ if not dict then
+ return nil
end
- local infos = get_nacos_services()
- if #infos == 0 then
- return
+ local value = dict:get(key)
+ if not value then
+ return nil
end
- local host_list = local_conf.discovery.nacos.host
- local host_count = #host_list
- local start = math_random(host_count)
-
- for i = 0, host_count - 1 do
- local idx = (start + i - 1) % host_count + 1
- local base_uri, username, password = get_base_uri_by_index(idx)
+ local nodes = core.json.decode(value)
+ if not metadata then
+ return nodes
+ end
- if not base_uri then
- log.warn('nacos host at index ', idx, ' is invalid, skip')
- else
- local ok, err = fetch_from_host(base_uri, username, password,
infos)
- if ok then
- return
- end
- log.error('fetch_from_host: ', base_uri, ' err:', err)
+ local res = {}
+ for _, node in ipairs(nodes) do
+ if match_metadata(node.metadata, metadata) then
+ core.table.insert(res, node)
end
end
-
- log.error('failed to fetch nacos registry from all hosts')
+ return res
end
+-- ─── Standard discovery interface ─────────────────────────────────────
+
function _M.nodes(service_name, discovery_args)
local namespace_id = discovery_args and
- discovery_args.namespace_id or default_namespace_id
+ discovery_args.namespace_id or "public"
local group_name = discovery_args
- and discovery_args.group_name or default_group_name
- local key = get_key(namespace_id, group_name, service_name)
- local value = nacos_dict:get(key)
- if not value then
- core.log.error("nacos service not found: ", service_name)
- return nil
- end
- local nodes = core.json.decode(value)
- return nodes
+ and discovery_args.group_name or "DEFAULT_GROUP"
+
+ local key = "default/" .. namespace_id .. "/" .. group_name .. "/" ..
service_name
+ return _M.get_nodes(key)
end
function _M.init_worker()
- default_weight = local_conf.discovery.nacos.weight
- log.info('default_weight:', default_weight)
- local fetch_interval = local_conf.discovery.nacos.fetch_interval
- log.info('fetch_interval:', fetch_interval)
- access_key = local_conf.discovery.nacos.access_key
- secret_key = local_conf.discovery.nacos.secret_key
- ngx_timer_at(0, fetch_full_registry)
- ngx_timer_every(fetch_interval, fetch_full_registry)
+ local dict = ngx.shared[dict_name]
+ if not dict then
+ error('lua_shared_dict "' .. dict_name .. '" not configured')
+ end
+
+ nacos_dict = dict
+
+ local nacos_conf = local_conf.discovery and local_conf.discovery.nacos
+ if not nacos_conf then
+ return
+ end
+
+ -- shallow copy to avoid mutating cached config
+ local conf = {}
+ for k, v in pairs(nacos_conf) do
+ conf[k] = v
+ end
+ conf.id = "default"
+ local reg = _M.create_registry(conf)
+ _M.start_registry(reg)
end
function _M.dump_data()
- local keys = nacos_dict:get_keys(0)
+ local dict = get_dict()
+ if not dict then
+ return {services = {}}
+ end
+
+ local keys = dict:get_keys(0)
local applications = {}
for _, key in ipairs(keys) do
- local value = nacos_dict:get(key)
+ local value = dict:get(key)
if value then
local nodes = core.json.decode(value)
if nodes then
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 3ea057123..ad6f76d64 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -423,6 +423,7 @@ _EOC_
lua_shared_dict kubernetes-stream 1m;
lua_shared_dict kubernetes-first-stream 1m;
lua_shared_dict kubernetes-second-stream 1m;
+ lua_shared_dict nacos-stream 10m;
lua_shared_dict tars-stream 1m;
upstream apisix_backend {
diff --git a/t/discovery/nacos2.t b/t/discovery/nacos2.t
index 044067ebd..5d57b288e 100644
--- a/t/discovery/nacos2.t
+++ b/t/discovery/nacos2.t
@@ -83,7 +83,7 @@ GET /hello
--- response_body_like eval
qr/server [1-2]/
--- error_log
-error: status = 502
+err:status = 502
@@ -333,7 +333,7 @@ discovery:
local body = json_decode(res.body)
local services = body.services
- local service = services["public.DEFAULT_GROUP.APISIX-NACOS"]
+ local service =
services["default/public/DEFAULT_GROUP/APISIX-NACOS"]
local number = table.getn(service.nodes)
ngx.say(number)
}