nic-6443 commented on code in PR #13201: URL: https://github.com/apache/apisix/pull/13201#discussion_r3071702940
########## apisix/discovery/nacos/client.lua: ########## @@ -0,0 +1,362 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Reusable HTTP client primitives for Nacos service discovery. +--- Extracted from init.lua so that both static-config mode and +--- dynamic-config mode can share the same core logic. + +local require = require +local http = require('resty.http') +local core = require('apisix.core') +local ipairs = ipairs +local type = type +local ngx = ngx +local string = string +local string_sub = string.sub +local str_byte = string.byte +local str_find = core.string.find +local log = core.log + +local auth_path = 'auth/login' +local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName=' +local default_namespace_id = "public" +local default_group_name = "DEFAULT_GROUP" + + +local _M = {} + + +-- ─── HTTP primitives ────────────────────────────────────────────────── + +function _M.request(request_uri, path, body, method, basic_auth, timeout) + local url = request_uri .. path + log.info('request url:', request_uri, path) Review Comment: This is pre-existing behavior from the original `init.lua` — we just extracted it into `client.lua` without changing the logging. Agree it should be improved but I'd prefer to handle it in a separate follow-up PR to keep this refactoring focused. ########## apisix/discovery/nacos/client.lua: ########## @@ -0,0 +1,362 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Reusable HTTP client primitives for Nacos service discovery. +--- Extracted from init.lua so that both static-config mode and +--- dynamic-config mode can share the same core logic. + +local require = require +local http = require('resty.http') +local core = require('apisix.core') +local ipairs = ipairs +local type = type +local ngx = ngx +local string = string +local string_sub = string.sub +local str_byte = string.byte +local str_find = core.string.find +local log = core.log + +local auth_path = 'auth/login' +local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName=' +local default_namespace_id = "public" +local default_group_name = "DEFAULT_GROUP" + + +local _M = {} + + +-- ─── HTTP primitives ────────────────────────────────────────────────── + +function _M.request(request_uri, path, body, method, basic_auth, timeout) + local url = request_uri .. path + log.info('request url:', request_uri, path) + local headers = {} + headers['Accept'] = 'application/json' + + if basic_auth then + headers['Authorization'] = basic_auth + end + + if body and 'table' == type(body) then + local err + body, err = core.json.encode(body) + if not body then + return nil, 'invalid body : ' .. err + end + headers['Content-Type'] = 'application/json' + end + + local httpc = http.new() + timeout = timeout or {} + local connect_timeout = timeout.connect or 2000 + local send_timeout = timeout.send or 5000 + local read_timeout = timeout.read or 5000 + httpc:set_timeouts(connect_timeout, send_timeout, read_timeout) + local res, err = httpc:request_uri(url, { + method = method, + headers = headers, + body = body, + ssl_verify = true, + }) + if not res then + return nil, err + end + + if not res.body or res.status ~= 200 then + return nil, 'status = ' .. res.status + end + + local json_str = res.body + local data, decode_err = core.json.decode(json_str) + if not data then + return nil, decode_err + end + return data +end + + +-- ─── authentication ─────────────────────────────────────────────────── + +function _M.get_token_param(base_uri, username, password, timeout) + if not username or not password then + return '' + end + + local args = { username = username, password = password } + local data, err = _M.request(base_uri, auth_path .. '?' .. ngx.encode_args(args), + nil, 'POST', nil, timeout) + if err then + log.error('nacos login fail:', username, ' desc:', err) + return nil, err + end + if type(data) ~= "table" or not data.accessToken or data.accessToken == "" then + return nil, 'nacos login response missing accessToken' + end + return '&accessToken=' .. data.accessToken +end + + +function _M.get_signed_param(group_name, service_name, access_key, secret_key) + local param = '' + if access_key and access_key ~= '' and secret_key and secret_key ~= '' then + local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' .. service_name + local args = { + ak = access_key, + data = str_to_sign, + signature = ngx.encode_base64(ngx.hmac_sha1(secret_key, str_to_sign)) + } + param = '&' .. ngx.encode_args(args) + end + return param +end + + +-- ─── URL building ───────────────────────────────────────────────────── + +function _M.build_base_uri(url, prefix) + local auth_idx = core.string.rfind_char(url, '@') + local username, password + if auth_idx then + local protocol_idx = str_find(url, '://') + local protocol = string_sub(url, 1, protocol_idx + 2) + local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1) + local colon_idx = str_find(user_and_password, ':') + if colon_idx then + username = string_sub(user_and_password, 1, colon_idx - 1) + password = string_sub(user_and_password, colon_idx + 1) + end + local other = string_sub(url, auth_idx + 1) + url = protocol .. other + end + + if prefix then + url = url .. prefix + end + + if str_byte(url, #url) ~= str_byte('/') then + url = url .. '/' + end + + return url, username, password +end + + +-- ─── query param helpers ────────────────────────────────────────────── + +local function get_namespace_param(namespace_id) + local param = '' + if namespace_id then + local args = { namespaceId = namespace_id } + param = '&' .. ngx.encode_args(args) + end + return param +end + + +local function get_group_name_param(group_name) + local param = '' + if group_name then + local args = { groupName = group_name } + param = '&' .. ngx.encode_args(args) + end + return param +end + + +local function is_grpc(scheme) + return scheme == 'grpc' or scheme == 'grpcs' +end + + +-- ─── instance fetching ──────────────────────────────────────────────── + +--- Fetch instances from a single nacos host for a list of services. +--- +--- Returns: nodes_cache (table of key → nodes), service_names (set of key → true) +--- On failure: nil, nil, err_string +--- +--- options: +--- default_weight (number) default node weight +--- access_key (string) AK for HMAC-SHA1 signing (optional) +--- secret_key (string) SK for HMAC-SHA1 signing (optional) +--- timeout (table) { connect, send, read } in ms +--- preserve_metadata (bool) include instance.metadata in returned nodes +--- key_builder (function) key_builder(namespace_id, group_name, service_name) +--- returns the key to use for this service in the result. +--- default: ns_id.group.service + +function _M.fetch_from_host(base_uri, username, password, services, options) + options = options or {} + local dw = options.default_weight or 100 + local ak = options.access_key + local sk = options.secret_key + local timeout = options.timeout + local preserve_metadata = options.preserve_metadata + local key_builder = options.key_builder + + local token_param, err = _M.get_token_param(base_uri, username, password, timeout) + if err then + return nil, nil, err + end + + local service_names = {} + local nodes_cache = {} + local had_success = false + + for _, service_info in ipairs(services) do + local namespace_id = service_info.namespace_id + local group_name = service_info.group_name + local scheme = service_info.scheme or '' + local namespace_param = get_namespace_param(namespace_id) + local group_name_param = get_group_name_param(group_name) + local signature_param = _M.get_signed_param( + group_name, service_info.service_name, ak, sk) + local query_path = instance_list_path .. service_info.service_name + .. token_param .. namespace_param .. group_name_param + .. signature_param + local data, req_err = _M.request(base_uri, query_path, nil, 'GET', nil, timeout) + if req_err then + log.error('get_url:', query_path, ' err:', req_err) Review Comment: Same as above — this error log format is carried over from the original code. Will address credential scrubbing in a follow-up PR. ########## apisix/discovery/nacos/client.lua: ########## @@ -0,0 +1,362 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Reusable HTTP client primitives for Nacos service discovery. +--- Extracted from init.lua so that both static-config mode and +--- dynamic-config mode can share the same core logic. + +local require = require +local http = require('resty.http') +local core = require('apisix.core') +local ipairs = ipairs +local type = type +local ngx = ngx +local string = string +local string_sub = string.sub +local str_byte = string.byte +local str_find = core.string.find +local log = core.log + +local auth_path = 'auth/login' +local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName=' +local default_namespace_id = "public" +local default_group_name = "DEFAULT_GROUP" + + +local _M = {} + + +-- ─── HTTP primitives ────────────────────────────────────────────────── + +function _M.request(request_uri, path, body, method, basic_auth, timeout) + local url = request_uri .. path + log.info('request url:', request_uri, path) + local headers = {} + headers['Accept'] = 'application/json' + + if basic_auth then + headers['Authorization'] = basic_auth + end + + if body and 'table' == type(body) then + local err + body, err = core.json.encode(body) + if not body then + return nil, 'invalid body : ' .. err + end + headers['Content-Type'] = 'application/json' + end + + local httpc = http.new() + timeout = timeout or {} + local connect_timeout = timeout.connect or 2000 + local send_timeout = timeout.send or 5000 + local read_timeout = timeout.read or 5000 + httpc:set_timeouts(connect_timeout, send_timeout, read_timeout) + local res, err = httpc:request_uri(url, { + method = method, + headers = headers, + body = body, + ssl_verify = true, + }) + if not res then + return nil, err + end + + if not res.body or res.status ~= 200 then + return nil, 'status = ' .. res.status + end + + local json_str = res.body + local data, decode_err = core.json.decode(json_str) + if not data then + return nil, decode_err + end + return data +end + + +-- ─── authentication ─────────────────────────────────────────────────── + +function _M.get_token_param(base_uri, username, password, timeout) + if not username or not password then + return '' + end + + local args = { username = username, password = password } + local data, err = _M.request(base_uri, auth_path .. '?' .. ngx.encode_args(args), + nil, 'POST', nil, timeout) + if err then + log.error('nacos login fail:', username, ' desc:', err) + return nil, err + end + if type(data) ~= "table" or not data.accessToken or data.accessToken == "" then + return nil, 'nacos login response missing accessToken' + end + return '&accessToken=' .. data.accessToken +end + + +function _M.get_signed_param(group_name, service_name, access_key, secret_key) + local param = '' + if access_key and access_key ~= '' and secret_key and secret_key ~= '' then + local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' .. service_name + local args = { + ak = access_key, + data = str_to_sign, + signature = ngx.encode_base64(ngx.hmac_sha1(secret_key, str_to_sign)) + } + param = '&' .. ngx.encode_args(args) + end + return param +end + + +-- ─── URL building ───────────────────────────────────────────────────── + +function _M.build_base_uri(url, prefix) + local auth_idx = core.string.rfind_char(url, '@') + local username, password + if auth_idx then + local protocol_idx = str_find(url, '://') Review Comment: Good catch. Added a nil guard for `protocol_idx` — now returns `nil` for malformed URLs. Fixed in 03d8241. ########## apisix/discovery/nacos/init.lua: ########## @@ -17,418 +17,295 @@ local require = require local local_conf = require('apisix.core.config_local').local_conf() -local http = require('resty.http') local core = require('apisix.core') +local nacos_client = require('apisix.discovery.nacos.client') +local is_http = ngx.config.subsystem == "http" local ipairs = ipairs local pairs = pairs -local type = type +local error = error local math_random = math.random local ngx = ngx -local ngx_re = require('ngx.re') local ngx_timer_at = ngx.timer.at -local ngx_timer_every = ngx.timer.every -local string = string -local string_sub = string.sub -local str_byte = string.byte -local str_find = core.string.find local log = core.log -local default_weight -local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name -if not nacos_dict then - error("lua_shared_dict \"nacos\" not configured") -end - -local auth_path = 'auth/login' -local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName=' -local default_namespace_id = "public" -local default_group_name = "DEFAULT_GROUP" -local access_key -local secret_key - - local _M = {} -local function get_key(namespace_id, group_name, service_name) - return namespace_id .. '.' .. group_name .. '.' .. service_name -end - -local function request(request_uri, path, body, method, basic_auth) - local url = request_uri .. path - log.info('request url:', url) - local headers = {} - headers['Accept'] = 'application/json' - - if basic_auth then - headers['Authorization'] = basic_auth - end - - if body and 'table' == type(body) then - local err - body, err = core.json.encode(body) - if not body then - return nil, 'invalid body : ' .. err - end - headers['Content-Type'] = 'application/json' - end +local nacos_dict +local registries = {} - local httpc = http.new() - local timeout = local_conf.discovery.nacos.timeout - local connect_timeout = timeout.connect - local send_timeout = timeout.send - local read_timeout = timeout.read - log.info('connect_timeout:', connect_timeout, ', send_timeout:', send_timeout, - ', read_timeout:', read_timeout) - httpc:set_timeouts(connect_timeout, send_timeout, read_timeout) - local res, err = httpc:request_uri(url, { - method = method, - headers = headers, - body = body, - ssl_verify = true, - }) - if not res then - return nil, err - end - - if not res.body or res.status ~= 200 then - return nil, 'status = ' .. res.status - end +local dict_name = is_http and "nacos" or "nacos-stream" - local json_str = res.body - local data, err = core.json.decode(json_str) - if not data then - return nil, err +local function get_dict() + if not nacos_dict then + nacos_dict = ngx.shared[dict_name] end - return data + return nacos_dict end -local function get_url(request_uri, path) - return request(request_uri, path, nil, 'GET', nil) -end - - -local function post_url(request_uri, path, body) - return request(request_uri, path, body, 'POST', nil) -end - - -local function get_token_param(base_uri, username, password) - if not username or not password then - return '' - end - - local args = { username = username, password = password} - local data, err = post_url(base_uri, auth_path .. '?' .. ngx.encode_args(args), nil) - if err then - log.error('nacos login fail:', username, ' ', password, ' desc:', err) - return nil, err +local function default_key_builder(id) + return function(namespace_id, group_name, service_name) + return id .. "/" .. namespace_id .. "/" .. group_name .. "/" .. service_name end - return '&accessToken=' .. data.accessToken end -local function get_namespace_param(namespace_id) - local param = '' - if namespace_id then - local args = {namespaceId = namespace_id} - param = '&' .. ngx.encode_args(args) +local function fetch_full_registry(premature, reg) + if premature or reg.stop_flag then + return end - return param -end - -local function get_group_name_param(group_name) - local param = '' - if group_name then - local args = {groupName = group_name} - param = '&' .. ngx.encode_args(args) + local dict = get_dict() + if not dict then + log.error("nacos shared dict not available") + return end - return param -end - -local function get_signed_param(group_name, service_name) - local param = '' - if access_key ~= '' and secret_key ~= '' then - local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' .. service_name - local args = { - ak = access_key, - data = str_to_sign, - signature = ngx.encode_base64(ngx.hmac_sha1(secret_key, str_to_sign)) - } - param = '&' .. ngx.encode_args(args) + local services = reg.service_scanner() + if reg.stop_flag then + return end - return param -end + local prefix = reg.id .. "/" -local function build_base_uri(url) - local auth_idx = core.string.rfind_char(url, '@') - local username, password - if auth_idx then - local protocol_idx = str_find(url, '://') - local protocol = string_sub(url, 1, protocol_idx + 2) - local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1) - local arr = ngx_re.split(user_and_password, ':') - if #arr == 2 then - username = arr[1] - password = arr[2] + if #services == 0 then + local all_keys = dict:get_keys(0) + for _, key in ipairs(all_keys) do + if core.string.has_prefix(key, prefix) then + dict:delete(key) + end end - local other = string_sub(url, auth_idx + 1) - url = protocol .. other - end - - if local_conf.discovery.nacos.prefix then - url = url .. local_conf.discovery.nacos.prefix - end - - if str_byte(url, #url) ~= str_byte('/') then - url = url .. '/' - end - - return url, username, password -end - - -local function get_base_uri_by_index(index) - local host = local_conf.discovery.nacos.host - - local url = host[index] - if not url then - return nil - end - - return build_base_uri(url) -end - - -local function de_duplication(services, namespace_id, group_name, service_name, scheme) - for _, service in ipairs(services) do - if service.namespace_id == namespace_id and service.group_name == group_name - and service.service_name == service_name and service.scheme == scheme then - return true + if not reg.stop_flag then + ngx_timer_at(reg.conf.fetch_interval or 30, fetch_full_registry, reg) end - end - return false -end - - -local function iter_and_add_service(services, values) - if not values then return end - for _, value in core.config_util.iterate_values(values) do - local conf = value.value - if not conf then - goto CONTINUE + local hosts = reg.conf.host + local host_count = #hosts + local start = math_random(host_count) + local timeout = reg.conf.timeout + + for i = 0, host_count - 1 do + if reg.stop_flag then + return end - local up - if conf.upstream then - up = conf.upstream + local idx = (start + i - 1) % host_count + 1 + local base_uri, username, password = nacos_client.build_base_uri( + hosts[idx], reg.conf.prefix) + + if not base_uri then + log.warn("nacos host at index ", idx, " is invalid, skip") else - up = conf - end + local nodes_cache, service_names, err = nacos_client.fetch_from_host( + base_uri, + username or reg.username, + password or reg.password, + services, { + default_weight = reg.conf.weight, + access_key = reg.conf.access_key, + secret_key = reg.conf.secret_key, + timeout = timeout, + preserve_metadata = reg.preserve_metadata, + key_builder = reg.key_builder, + }) + + if nodes_cache then + if reg.stop_flag then + return + end - local namespace_id = (up.discovery_args and up.discovery_args.namespace_id) - or default_namespace_id + for key, nodes in pairs(nodes_cache) do + dict:set(key, core.json.encode(nodes)) + end - local group_name = (up.discovery_args and up.discovery_args.group_name) - or default_group_name + local all_keys = dict:get_keys(0) + for _, key in ipairs(all_keys) do + if core.string.has_prefix(key, prefix) + and not service_names[key] then + dict:delete(key) + end + end - local dup = de_duplication(services, namespace_id, group_name, - up.service_name, up.scheme) - if dup then - goto CONTINUE + if not reg.stop_flag then + ngx_timer_at(reg.conf.fetch_interval or 30, + fetch_full_registry, reg) + end + return + end + log.error("fetch_from_host: ", base_uri, " err:", err) end + end - if up.discovery_type == 'nacos' then - core.table.insert(services, { - service_name = up.service_name, - namespace_id = namespace_id, - group_name = group_name, - scheme = up.scheme, - }) - end - ::CONTINUE:: + log.error("failed to fetch nacos registry from all hosts, id: ", reg.id) + if not reg.stop_flag then + ngx_timer_at(reg.conf.fetch_interval or 30, fetch_full_registry, reg) end end -local function get_nacos_services() - local services = {} - - -- here we use lazy load to work around circle dependency - local get_upstreams = require('apisix.upstream').upstreams - local get_routes = require('apisix.router').http_routes - local get_stream_routes = require('apisix.router').stream_routes - local get_services = require('apisix.http.service').services - local values = get_upstreams() - iter_and_add_service(services, values) - values = get_routes() - iter_and_add_service(services, values) - values = get_services() - iter_and_add_service(services, values) - values = get_stream_routes() - iter_and_add_service(services, values) - return services +-- ─── Registry management API ────────────────────────────────────────── + +--- Create a nacos registry instance. +--- +--- conf fields: id, host (array), fetch_interval, prefix, weight, +--- access_key, secret_key, timeout ({connect,send,read} in ms) +--- +--- options: service_scanner (function), preserve_metadata (bool), +--- key_builder (function(ns,group,svc)->string), +--- username (string), password (string) +function _M.create_registry(conf, options) + options = options or {} + local id = conf.id + local reg = { + id = id, + conf = conf, + stop_flag = false, + preserve_metadata = options.preserve_metadata or false, + key_builder = options.key_builder or default_key_builder(id), + service_scanner = options.service_scanner or function() + return nacos_client.get_nacos_services() + end, + username = options.username, + password = options.password, + } + + registries[id] = reg + return reg Review Comment: Fixed — `create_registry()` now validates that `id` is non-nil/non-empty (returns `nil, err`), and auto-calls `stop_registry(id)` if the id is already in use before creating the new one. Fixed in 03d8241. ########## apisix/discovery/nacos/init.lua: ########## @@ -17,418 +17,295 @@ local require = require local local_conf = require('apisix.core.config_local').local_conf() -local http = require('resty.http') local core = require('apisix.core') +local nacos_client = require('apisix.discovery.nacos.client') +local is_http = ngx.config.subsystem == "http" local ipairs = ipairs local pairs = pairs -local type = type +local error = error local math_random = math.random local ngx = ngx -local ngx_re = require('ngx.re') local ngx_timer_at = ngx.timer.at -local ngx_timer_every = ngx.timer.every -local string = string -local string_sub = string.sub -local str_byte = string.byte -local str_find = core.string.find local log = core.log -local default_weight -local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name -if not nacos_dict then - error("lua_shared_dict \"nacos\" not configured") -end - -local auth_path = 'auth/login' -local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName=' -local default_namespace_id = "public" -local default_group_name = "DEFAULT_GROUP" -local access_key -local secret_key - - local _M = {} -local function get_key(namespace_id, group_name, service_name) - return namespace_id .. '.' .. group_name .. '.' .. service_name -end - -local function request(request_uri, path, body, method, basic_auth) - local url = request_uri .. path - log.info('request url:', url) - local headers = {} - headers['Accept'] = 'application/json' - - if basic_auth then - headers['Authorization'] = basic_auth - end - - if body and 'table' == type(body) then - local err - body, err = core.json.encode(body) - if not body then - return nil, 'invalid body : ' .. err - end - headers['Content-Type'] = 'application/json' - end +local nacos_dict +local registries = {} - local httpc = http.new() - local timeout = local_conf.discovery.nacos.timeout - local connect_timeout = timeout.connect - local send_timeout = timeout.send - local read_timeout = timeout.read - log.info('connect_timeout:', connect_timeout, ', send_timeout:', send_timeout, - ', read_timeout:', read_timeout) - httpc:set_timeouts(connect_timeout, send_timeout, read_timeout) - local res, err = httpc:request_uri(url, { - method = method, - headers = headers, - body = body, - ssl_verify = true, - }) - if not res then - return nil, err - end - - if not res.body or res.status ~= 200 then - return nil, 'status = ' .. res.status - end +local dict_name = is_http and "nacos" or "nacos-stream" - local json_str = res.body - local data, err = core.json.decode(json_str) - if not data then - return nil, err +local function get_dict() + if not nacos_dict then + nacos_dict = ngx.shared[dict_name] end - return data + return nacos_dict end -local function get_url(request_uri, path) - return request(request_uri, path, nil, 'GET', nil) -end - - -local function post_url(request_uri, path, body) - return request(request_uri, path, body, 'POST', nil) -end - - -local function get_token_param(base_uri, username, password) - if not username or not password then - return '' - end - - local args = { username = username, password = password} - local data, err = post_url(base_uri, auth_path .. '?' .. ngx.encode_args(args), nil) - if err then - log.error('nacos login fail:', username, ' ', password, ' desc:', err) - return nil, err +local function default_key_builder(id) + return function(namespace_id, group_name, service_name) + return id .. "/" .. namespace_id .. "/" .. group_name .. "/" .. service_name end - return '&accessToken=' .. data.accessToken end -local function get_namespace_param(namespace_id) - local param = '' - if namespace_id then - local args = {namespaceId = namespace_id} - param = '&' .. ngx.encode_args(args) +local function fetch_full_registry(premature, reg) + if premature or reg.stop_flag then + return end - return param -end - -local function get_group_name_param(group_name) - local param = '' - if group_name then - local args = {groupName = group_name} - param = '&' .. ngx.encode_args(args) + local dict = get_dict() + if not dict then + log.error("nacos shared dict not available") + return end - return param -end - -local function get_signed_param(group_name, service_name) - local param = '' - if access_key ~= '' and secret_key ~= '' then - local str_to_sign = ngx.now() * 1000 .. '@@' .. group_name .. '@@' .. service_name - local args = { - ak = access_key, - data = str_to_sign, - signature = ngx.encode_base64(ngx.hmac_sha1(secret_key, str_to_sign)) - } - param = '&' .. ngx.encode_args(args) + local services = reg.service_scanner() + if reg.stop_flag then + return end - return param -end + local prefix = reg.id .. "/" -local function build_base_uri(url) - local auth_idx = core.string.rfind_char(url, '@') - local username, password - if auth_idx then - local protocol_idx = str_find(url, '://') - local protocol = string_sub(url, 1, protocol_idx + 2) - local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1) - local arr = ngx_re.split(user_and_password, ':') - if #arr == 2 then - username = arr[1] - password = arr[2] + if #services == 0 then + local all_keys = dict:get_keys(0) + for _, key in ipairs(all_keys) do + if core.string.has_prefix(key, prefix) then + dict:delete(key) + end end - local other = string_sub(url, auth_idx + 1) - url = protocol .. other - end - - if local_conf.discovery.nacos.prefix then - url = url .. local_conf.discovery.nacos.prefix - end - - if str_byte(url, #url) ~= str_byte('/') then - url = url .. '/' - end - - return url, username, password -end - - -local function get_base_uri_by_index(index) - local host = local_conf.discovery.nacos.host - - local url = host[index] - if not url then - return nil - end - - return build_base_uri(url) -end - - -local function de_duplication(services, namespace_id, group_name, service_name, scheme) - for _, service in ipairs(services) do - if service.namespace_id == namespace_id and service.group_name == group_name - and service.service_name == service_name and service.scheme == scheme then - return true + if not reg.stop_flag then + ngx_timer_at(reg.conf.fetch_interval or 30, fetch_full_registry, reg) end - end - return false -end - - -local function iter_and_add_service(services, values) - if not values then return end - for _, value in core.config_util.iterate_values(values) do - local conf = value.value - if not conf then - goto CONTINUE + local hosts = reg.conf.host + local host_count = #hosts + local start = math_random(host_count) + local timeout = reg.conf.timeout + + for i = 0, host_count - 1 do + if reg.stop_flag then + return end - local up - if conf.upstream then - up = conf.upstream + local idx = (start + i - 1) % host_count + 1 + local base_uri, username, password = nacos_client.build_base_uri( + hosts[idx], reg.conf.prefix) + + if not base_uri then + log.warn("nacos host at index ", idx, " is invalid, skip") else - up = conf - end + local nodes_cache, service_names, err = nacos_client.fetch_from_host( + base_uri, + username or reg.username, + password or reg.password, + services, { + default_weight = reg.conf.weight, + access_key = reg.conf.access_key, + secret_key = reg.conf.secret_key, + timeout = timeout, + preserve_metadata = reg.preserve_metadata, + key_builder = reg.key_builder, + }) + + if nodes_cache then + if reg.stop_flag then + return + end - local namespace_id = (up.discovery_args and up.discovery_args.namespace_id) - or default_namespace_id + for key, nodes in pairs(nodes_cache) do + dict:set(key, core.json.encode(nodes)) + end - local group_name = (up.discovery_args and up.discovery_args.group_name) - or default_group_name + local all_keys = dict:get_keys(0) + for _, key in ipairs(all_keys) do + if core.string.has_prefix(key, prefix) + and not service_names[key] then + dict:delete(key) + end + end Review Comment: This is the pre-existing stale-key cleanup pattern from the original `init.lua`. The dict is scoped to nacos discovery only and the number of keys is typically small (one per service). Agree it could be optimized with a local key set diff, but I'd prefer to do that as a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
