wenj91 commented on issue #8782: URL: https://github.com/apache/apisix/issues/8782#issuecomment-3562219525
你可以用这个脚本修复: ```lua -- Licensed to the Apache Software Foundation (ASF) under one or more -- contributor license agreements. See the NOTICE file distributed with -- this work for additional information regarding copyright ownership. -- The ASF licenses this file to You under the Apache License, Version 2.0 -- (the "License"); you may not use this file except in compliance with -- the License. You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- local require = require local local_conf = require('apisix.core.config_local').local_conf() local http = require('resty.http') local core = require('apisix.core') local ipairs = ipairs local type = type local math = math local math_random = math.random local ngx = ngx local ngx_re = require('ngx.re') local ngx_timer_at = ngx.timer.at local ngx_timer_every = ngx.timer.every local string = string local string_sub = string.sub local str_byte = string.byte local str_find = core.string.find local log = core.log local default_weight local applications local auth_path = 'auth/login' local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName=' local default_namespace_id = "public" local default_group_name = "DEFAULT_GROUP" local access_key local secret_key local events local events_list -- 健康状态管理和重试配置(写死) local host_health_status = {} local health_check_interval = 5000 -- 5秒 local health_check_timeout = 2000 -- 2秒 local max_failures = 3 -- 最大失败次数 local recovery_time = 30000 -- 30秒恢复时间 -- 缓存数据相关 local cached_applications = nil -- 缓存的应用数据 local cache_valid = false -- 缓存是否有效 local last_successful_update = 0 -- 最后一次成功更新时间戳 local max_cache_age = 300 -- 缓存最大有效期(5分钟) local _M = {} local function discovery_nacos_callback(data, event, source, pid) applications = data -- 更新成功时也更新缓存 if data then cached_applications = core.json.decode(core.json.encode(data)) -- 深拷贝 cache_valid = true last_successful_update = ngx.now() log.notice("Updated cache data on successful update") end log.notice("update local variable application, event is: ", event, "source: ", source, "server pid:", pid, ", application: ", core.json.encode(applications, true)) end -- 健康检查函数 local function check_host_health(host_url) local status = host_health_status[host_url] -- 如果节点处于恢复期,检查是否过了恢复时间 if status and status.unhealthy_time then local now = ngx.now() * 1000 -- 转换为毫秒 if now - status.unhealthy_time > recovery_time then -- 恢复期结束,重置状态 host_health_status[host_url] = { healthy = true, failure_count = 0, last_check_time = now } log.info("Host ", host_url, " recovery period ended, reset to healthy") end end -- 如果节点标记为不健康,直接返回false if status and not status.healthy then return false end return true end -- 标记节点为健康 local function mark_host_healthy(host_url) host_health_status[host_url] = { healthy = true, failure_count = 0, last_check_time = ngx.now() * 1000 } log.info("Mark host as healthy: ", host_url) end -- 标记节点为不健康 local function mark_host_unhealthy(host_url, reason) local now = ngx.now() * 1000 local status = host_health_status[host_url] or { failure_count = 0 } status.failure_count = status.failure_count + 1 status.last_check_time = now status.last_error = reason if status.failure_count >= max_failures then status.healthy = false status.unhealthy_time = now log.error("Mark host as unhealthy: ", host_url, ", failures: ", status.failure_count, ", reason: ", reason) else log.warn("Host failure count: ", host_url, " - ", status.failure_count, "/", max_failures, ", reason: ", reason) end host_health_status[host_url] = status end -- 获取健康节点列表 local function get_healthy_hosts() local hosts = local_conf.discovery.nacos.host local healthy_hosts = {} for _, host in ipairs(hosts) do if check_host_health(host) then core.table.insert(healthy_hosts, host) end end return healthy_hosts end -- 健康检查任务 local function health_check_task(premature) if premature then return end local hosts = local_conf.discovery.nacos.host for _, host_url in ipairs(hosts) do local ok, err = ngx.timer.at(0, function() local httpc = http.new() httpc:set_timeout(health_check_timeout) -- 构建健康检查URL local health_url = host_url if local_conf.discovery.nacos.prefix then health_url = health_url .. local_conf.discovery.nacos.prefix end health_url = health_url .. "cs/history?search=accurate&dataId=1&group=1" local res, err = httpc:request_uri(health_url, { method = "GET", ssl_verify = false }) if res and res.status == 200 then mark_host_healthy(host_url) else mark_host_unhealthy(host_url, err or "Health check failed with status: " .. (res and res.status or "unknown")) end end) if not ok then log.error("Failed to create health check timer for host: ", host_url, ", error: ", err) end end end local function request(request_uri, path, body, method, basic_auth) local url = request_uri .. path log.info('request url:', url) local headers = {} headers['Accept'] = 'application/json' if basic_auth then headers['Authorization'] = basic_auth end if body and 'table' == type(body) then local err body, err = core.json.encode(body) if not body then return nil, 'invalid body : ' .. err end headers['Content-Type'] = 'application/json' end local httpc = http.new() local timeout = local_conf.discovery.nacos.timeout local connect_timeout = timeout.connect local send_timeout = timeout.send local read_timeout = timeout.read log.info('connect_timeout:', connect_timeout, ', send_timeout:', send_timeout, ', read_timeout:', read_timeout) httpc:set_timeouts(connect_timeout, send_timeout, read_timeout) local res, err = httpc:request_uri(url, { method = method, headers = headers, body = body, ssl_verify = true, }) if not res then return nil, err end if not res.body or res.status ~= 200 then return nil, 'status = ' .. res.status end local json_str = res.body local data, err = core.json.decode(json_str) if not data then return nil, err end return data end local function get_url(request_uri, path) return request(request_uri, path, nil, 'GET', nil) end local function post_url(request_uri, path, body) return request(request_uri, path, body, 'POST', nil) end -- 处理主机URL,提取认证信息(保持原逻辑) local function get_base_uri() local host = local_conf.discovery.nacos.host -- 使用健康节点 local healthy_hosts = get_healthy_hosts() if #healthy_hosts == 0 then -- 如果没有健康节点,使用所有节点 healthy_hosts = host end local selected_host = healthy_hosts[math_random(#healthy_hosts)] local url = selected_host local username, password local auth_idx = core.string.rfind_char(url, '@') if auth_idx then local protocol_idx = str_find(url, '://') local protocol = string_sub(url, 1, protocol_idx + 2) local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1) local arr = ngx_re.split(user_and_password, ':') if #arr == 2 then username = arr[1] password = arr[2] end local other = string_sub(url, auth_idx + 1) url = protocol .. other end if local_conf.discovery.nacos.prefix then url = url .. local_conf.discovery.nacos.prefix end if str_byte(url, #url) ~= str_byte('/') then url = url .. '/' end return url, username, password end local function get_token_param(base_uri, username, password) if not username or not password then return '' end local args = { username = username, password = password} local data, err = post_url(base_uri, auth_path .. '?' .. ngx.encode_args(args), nil) if err then log.error('nacos login fail:', username, ' ', password, ' desc:', err) return nil, err end return '&accessToken=' .. data.accessToken end local function get_namespace_param(namespace_id) local param = '' if namespace_id then local args = {namespaceId = namespace_id} param = '&' .. ngx.encode_args(args) end return param end local function get_group_name_param(group_name) local param = '' if group_name then local args = {groupName = group_name} param = '&' .. ngx.encode_args(args) end return param end local function de_duplication(services, namespace_id, group_name, service_name, scheme) for _, service in ipairs(services) do if service.namespace_id == namespace_id and service.group_name == group_name and service.service_name == service_name and service.scheme == scheme then return true end end return false end local function iter_and_add_service(services, values) if not values then return end for _, value in core.config_util.iterate_values(values) do local conf = value.value if not conf then goto CONTINUE end local up if conf.upstream then up = conf.upstream else up = conf end local namespace_id = (up.discovery_args and up.discovery_args.namespace_id) or default_namespace_id local group_name = (up.discovery_args and up.discovery_args.group_name) or default_group_name local dup = de_duplication(services, namespace_id, group_name, up.service_name, up.scheme) if dup then goto CONTINUE end if up.discovery_type == 'nacos' then core.table.insert(services, { service_name = up.service_name, namespace_id = namespace_id, group_name = group_name, scheme = up.scheme, }) end ::CONTINUE:: end end local function get_nacos_services() local services = {} -- here we use lazy load to work around circle dependency local get_upstreams = require('apisix.upstream').upstreams local get_routes = require('apisix.router').http_routes local get_stream_routes = require('apisix.router').stream_routes local get_services = require('apisix.http.service').services local values = get_upstreams() iter_and_add_service(services, values) values = get_routes() iter_and_add_service(services, values) values = get_services() iter_and_add_service(services, values) values = get_stream_routes() iter_and_add_service(services, values) return services end local function is_grpc(scheme) if scheme == 'grpc' or scheme == 'grpcs' then return true end return false end -- 检查缓存是否过期 local function is_cache_expired() if not cache_valid or not last_successful_update then return true end local now = ngx.now() return (now - last_successful_update) > max_cache_age end -- 使用缓存数据 local function use_cached_data() if cached_applications and not is_cache_expired() then applications = core.json.decode(core.json.encode(cached_applications)) -- 深拷贝 log.warn("Using cached service discovery data due to all nacos nodes failure") return true end return false end -- 更新应用数据 local function update_applications(new_apps) local new_apps_md5sum = ngx.md5(core.json.encode(new_apps)) local old_apps_md5sum = ngx.md5(core.json.encode(applications or {})) if new_apps_md5sum == old_apps_md5sum then return end applications = new_apps -- 更新缓存 cached_applications = core.json.decode(core.json.encode(new_apps)) -- 深拷贝 cache_valid = true last_successful_update = ngx.now() local ok, err = events.post(events_list._source, events_list.updating, applications) if not ok then log.error("post_event failure with ", events_list._source, ", update application error: ", err) else log.info("Successfully updated applications data and cache") end end local function fetch_full_registry(premature) if premature then return end local up_apps = {} local base_uri, username, password = get_base_uri() local token_param, err = get_token_param(base_uri, username, password) if err then log.error('get_token_param error:', err) -- 所有节点都失败时使用缓存 if not applications then if use_cached_data() then log.warn("Using cached data for initial setup") else applications = up_apps end end return end local infos = get_nacos_services() if #infos == 0 then applications = up_apps return end local success_count = 0 local total_services = #infos for _, service_info in ipairs(infos) do local data, err local namespace_id = service_info.namespace_id local group_name = service_info.group_name local scheme = service_info.scheme or '' local namespace_param = get_namespace_param(service_info.namespace_id) local group_name_param = get_group_name_param(service_info.group_name) local query_path = instance_list_path .. service_info.service_name .. token_param .. namespace_param .. group_name_param data, err = get_url(base_uri, query_path) if err then log.error('get_url:', query_path, ' err:', err) goto CONTINUE end if not up_apps[namespace_id] then up_apps[namespace_id] = {} end if not up_apps[namespace_id][group_name] then up_apps[namespace_id][group_name] = {} end for _, host in ipairs(data.hosts) do local nodes = up_apps[namespace_id] [group_name][service_info.service_name] if not nodes then nodes = {} up_apps[namespace_id] [group_name][service_info.service_name] = nodes end local node = { host = host.ip, port = host.port, weight = host.weight or default_weight, } -- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496 if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then node.port = host.metadata.gRPC_port end core.table.insert(nodes, node) end success_count = success_count + 1 end log.info("Service discovery completed: ", success_count, "/", total_services, " services updated") if success_count > 0 then -- 部分或全部服务更新成功 update_applications(up_apps) else -- 所有服务都失败了,尝试使用缓存数据 log.error("Failed to fetch any service information from all nacos nodes") if use_cached_data() then log.warn("Successfully fallback to cached service discovery data") else log.error("No valid cache available, service discovery data may be stale or empty") end end ::CONTINUE:: end function _M.nodes(service_name, discovery_args) local namespace_id = discovery_args and discovery_args.namespace_id or default_namespace_id local group_name = discovery_args and discovery_args.group_name or default_group_name local logged = false -- maximum waiting time: 5 seconds local waiting_time = 5 local step = 0.1 while not applications and waiting_time > 0 do if not logged then log.warn('wait init') logged = true end ngx.sleep(step) waiting_time = waiting_time - step end if not applications or not applications[namespace_id] or not applications[namespace_id][group_name] then return nil end return applications[namespace_id][group_name][service_name] end function _M.init_worker() events = require("resty.worker.events") events_list = events.event_list("discovery_nacos_update_application", "updating") if 0 ~= ngx.worker.id() then events.register(discovery_nacos_callback, events_list._source, events_list.updating) return end default_weight = local_conf.discovery.nacos.weight log.info('default_weight:', default_weight) local fetch_interval = local_conf.discovery.nacos.fetch_interval log.info('fetch_interval:', fetch_interval) access_key = local_conf.discovery.nacos.access_key secret_key = local_conf.discovery.nacos.secret_key -- 初始化健康状态 local hosts = local_conf.discovery.nacos.host for _, host in ipairs(hosts) do host_health_status[host] = { healthy = true, failure_count = 0, last_check_time = ngx.now() * 1000 } end -- 启动定时任务 ngx_timer_at(0, fetch_full_registry) ngx_timer_every(fetch_interval, fetch_full_registry) -- 启动健康检查 ngx_timer_every(health_check_interval / 1000, health_check_task) log.info("Nacos discovery initialized with health check and cache support") end function _M.dump_data() return { config = local_conf.discovery.nacos, services = applications or {}, cache = { cached_applications = cached_applications and true or false, cache_valid = cache_valid, last_successful_update = last_successful_update, is_cache_expired = is_cache_expired() } } end -- 获取健康状态信息(用于调试) function _M.get_health_status() return host_health_status end -- 手动清除缓存(用于测试和调试) function _M.clear_cache() cached_applications = nil cache_valid = false last_successful_update = 0 log.info("Cache cleared manually") end -- 获取缓存状态 function _M.get_cache_status() return { cache_valid = cache_valid, last_successful_update = last_successful_update, is_cache_expired = is_cache_expired(), cached_applications = cached_applications and true or false } end return _M ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
