Copilot commented on code in PR #12426: URL: https://github.com/apache/apisix/pull/12426#discussion_r2218523547
########## apisix/healthcheck_manager.lua: ########## @@ -0,0 +1,297 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local require = require +local ipairs = ipairs +local pcall = pcall +local exiting = ngx.worker.exiting +local pairs = pairs +local tostring = tostring +local core = require("apisix.core") +local config_local = require("apisix.core.config_local") +local healthcheck +local events = require("apisix.events") +local tab_clone = core.table.clone +local timer_every = ngx.timer.every +local _M = { + working_pool = {}, -- resource_path -> {version = ver, checker = checker} + waiting_pool = {} -- resource_path -> resource_ver +} +local healthcheck_shdict_name = "upstream-healthcheck" +local is_http = ngx.config.subsystem == "http" +if not is_http then + healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. ngx.config.subsystem +end + + +local function get_healthchecker_name(value) + return "upstream#" .. (value.resource_key or value.upstream.resource_key) +end +_M.get_healthchecker_name = get_healthchecker_name + + +local function fetch_latest_conf(resource_path) + local resource_type, id + -- Handle both formats: + -- 1. /apisix/<resource_type>/<id> + -- 2. /<resource_type>/<id> + if resource_path:find("^/apisix/") then + resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$") + else + resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$") + end + if not resource_type or not id then + core.log.error("invalid resource path: ", resource_path) + return nil + end + + local key + if resource_type == "upstreams" then + key = "/upstreams" + elseif resource_type == "routes" then + key = "/routes" + elseif resource_type == "services" then + key = "/services" + elseif resource_type == "stream_routes" then + key = "/stream_routes" + else + core.log.error("unsupported resource type: ", resource_type) + return nil + end + + local data = core.config.fetch_created_obj(key) + if not data then + core.log.error("failed to fetch configuration for type: ", key) + return nil + end + local resource = data:get(id) + if not resource then + -- this can happen if the resource was deleted + -- after the this function was called so we don't throw error + core.log.warn("resource not found: ", id, " in ", key) + return nil + end + + return resource +end + + +local function create_checker(up_conf) + local local_conf = config_local.local_conf() + if local_conf and local_conf.apisix and local_conf.apisix.disable_upstream_healthcheck then + core.log.info("healthchecker won't be created: disabled upstream healthcheck") + return nil + end + core.log.info("creating healthchecker for upstream: ", up_conf.resource_key) + if not healthcheck then + healthcheck = require("resty.healthcheck") + end + + local checker, err = healthcheck.new({ + name = get_healthchecker_name(up_conf), + shm_name = healthcheck_shdict_name, + checks = up_conf.checks, + events_module = events:get_healthcheck_events_modele(), + }) + + if not checker then + core.log.error("failed to create healthcheck: ", err) + return nil + end + + -- Add target nodes + local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host + local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port + local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host + local use_node_hdr = up_conf.pass_host == "node" or nil + + for _, node in ipairs(up_conf.nodes) do + local host_hdr = up_hdr or (use_node_hdr and node.domain) + local ok, err = checker:add_target(node.host, port or node.port, host, + true, host_hdr) + if not ok then + core.log.error("failed to add healthcheck target: ", node.host, ":", + port or node.port, " err: ", err) + end + end + + return checker +end + + +function _M.fetch_checker(resource_path, resource_ver) + local working_item = _M.working_pool[resource_path] + if working_item and working_item.version == resource_ver then + return working_item.checker + end + + if _M.waiting_pool[resource_path] == resource_ver then + return nil + end + + -- Add to waiting pool with version + core.log.info("adding ", resource_path, " to waiting pool with version: ", resource_ver) + _M.waiting_pool[resource_path] = resource_ver + return nil +end + + +function _M.fetch_node_status(checker, ip, port, hostname) + -- check if the checker is valid + if not checker or checker.dead then + return true + end + + return checker:get_target_status(ip, port, hostname) +end + + +local function add_working_pool(resource_path, resource_ver, checker) + _M.working_pool[resource_path] = { + version = resource_ver, + checker = checker + } +end + +local function find_in_working_pool(resource_path, resource_ver) + local checker = _M.working_pool[resource_path] + if not checker then + return nil -- not found + end + + if checker.version ~= resource_ver then + core.log.info("version mismatch for resource: ", resource_path, + " current version: ", checker.version, " requested version: ", resource_ver) + return nil -- version not match + end + return checker +end + + +function _M.upstream_version(index, nodes_ver) + if not index then + return + end + return index .. tostring(nodes_ver or '') +end + + +function _M.timer_create_checker() + if core.table.nkeys(_M.waiting_pool) == 0 then + return + end + + local waiting_snapshot = tab_clone(_M.waiting_pool) + for resource_path, resource_ver in pairs(waiting_snapshot) do + do + if find_in_working_pool(resource_path, resource_ver) then + core.log.info("resource: ", resource_path, + " already in working pool with version: ", + resource_ver) + goto continue + end + local res_conf = fetch_latest_conf(resource_path) + if not res_conf then + goto continue + end + local upstream = res_conf.value.upstream or res_conf.value + local new_version = _M.upstream_version(res_conf.modifiedIndex, upstream._nodes_ver) + core.log.info("checking waiting pool for resource: ", resource_path, + " current version: ", new_version, " requested version: ", resource_ver) + if resource_ver ~= new_version then + goto continue + end + + -- if a checker exists then delete it before creating a new one + local existing_checker = _M.working_pool[resource_path] + if existing_checker then + existing_checker.checker:delayed_clear(10) Review Comment: The magic number 10 (seconds) for delayed_clear should be defined as a named constant to improve maintainability and make the cleanup timeout configurable. ```suggestion existing_checker.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT) ``` ########## apisix/healthcheck_manager.lua: ########## @@ -0,0 +1,297 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local require = require +local ipairs = ipairs +local pcall = pcall +local exiting = ngx.worker.exiting +local pairs = pairs +local tostring = tostring +local core = require("apisix.core") +local config_local = require("apisix.core.config_local") +local healthcheck +local events = require("apisix.events") +local tab_clone = core.table.clone +local timer_every = ngx.timer.every +local _M = { + working_pool = {}, -- resource_path -> {version = ver, checker = checker} + waiting_pool = {} -- resource_path -> resource_ver +} +local healthcheck_shdict_name = "upstream-healthcheck" +local is_http = ngx.config.subsystem == "http" +if not is_http then + healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. ngx.config.subsystem +end + + +local function get_healthchecker_name(value) + return "upstream#" .. (value.resource_key or value.upstream.resource_key) +end +_M.get_healthchecker_name = get_healthchecker_name + + +local function fetch_latest_conf(resource_path) + local resource_type, id + -- Handle both formats: + -- 1. /apisix/<resource_type>/<id> + -- 2. /<resource_type>/<id> + if resource_path:find("^/apisix/") then + resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$") + else + resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$") + end + if not resource_type or not id then + core.log.error("invalid resource path: ", resource_path) + return nil + end + + local key + if resource_type == "upstreams" then + key = "/upstreams" + elseif resource_type == "routes" then + key = "/routes" + elseif resource_type == "services" then + key = "/services" + elseif resource_type == "stream_routes" then + key = "/stream_routes" + else + core.log.error("unsupported resource type: ", resource_type) + return nil + end + + local data = core.config.fetch_created_obj(key) + if not data then + core.log.error("failed to fetch configuration for type: ", key) + return nil + end + local resource = data:get(id) + if not resource then + -- this can happen if the resource was deleted + -- after the this function was called so we don't throw error + core.log.warn("resource not found: ", id, " in ", key) + return nil + end + + return resource +end + + +local function create_checker(up_conf) + local local_conf = config_local.local_conf() + if local_conf and local_conf.apisix and local_conf.apisix.disable_upstream_healthcheck then + core.log.info("healthchecker won't be created: disabled upstream healthcheck") + return nil + end + core.log.info("creating healthchecker for upstream: ", up_conf.resource_key) + if not healthcheck then + healthcheck = require("resty.healthcheck") + end + + local checker, err = healthcheck.new({ + name = get_healthchecker_name(up_conf), + shm_name = healthcheck_shdict_name, + checks = up_conf.checks, + events_module = events:get_healthcheck_events_modele(), + }) + + if not checker then + core.log.error("failed to create healthcheck: ", err) + return nil + end + + -- Add target nodes + local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host + local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port + local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host + local use_node_hdr = up_conf.pass_host == "node" or nil + + for _, node in ipairs(up_conf.nodes) do + local host_hdr = up_hdr or (use_node_hdr and node.domain) + local ok, err = checker:add_target(node.host, port or node.port, host, + true, host_hdr) + if not ok then + core.log.error("failed to add healthcheck target: ", node.host, ":", + port or node.port, " err: ", err) + end + end + + return checker +end + + +function _M.fetch_checker(resource_path, resource_ver) + local working_item = _M.working_pool[resource_path] + if working_item and working_item.version == resource_ver then + return working_item.checker + end + + if _M.waiting_pool[resource_path] == resource_ver then + return nil + end + + -- Add to waiting pool with version + core.log.info("adding ", resource_path, " to waiting pool with version: ", resource_ver) + _M.waiting_pool[resource_path] = resource_ver + return nil +end + + +function _M.fetch_node_status(checker, ip, port, hostname) + -- check if the checker is valid + if not checker or checker.dead then + return true + end + + return checker:get_target_status(ip, port, hostname) +end + + +local function add_working_pool(resource_path, resource_ver, checker) + _M.working_pool[resource_path] = { + version = resource_ver, + checker = checker + } +end + +local function find_in_working_pool(resource_path, resource_ver) + local checker = _M.working_pool[resource_path] + if not checker then + return nil -- not found + end + + if checker.version ~= resource_ver then + core.log.info("version mismatch for resource: ", resource_path, + " current version: ", checker.version, " requested version: ", resource_ver) + return nil -- version not match + end + return checker +end + + +function _M.upstream_version(index, nodes_ver) + if not index then + return + end + return index .. tostring(nodes_ver or '') +end + + +function _M.timer_create_checker() + if core.table.nkeys(_M.waiting_pool) == 0 then + return + end + + local waiting_snapshot = tab_clone(_M.waiting_pool) + for resource_path, resource_ver in pairs(waiting_snapshot) do + do + if find_in_working_pool(resource_path, resource_ver) then + core.log.info("resource: ", resource_path, + " already in working pool with version: ", + resource_ver) + goto continue + end + local res_conf = fetch_latest_conf(resource_path) + if not res_conf then + goto continue + end + local upstream = res_conf.value.upstream or res_conf.value + local new_version = _M.upstream_version(res_conf.modifiedIndex, upstream._nodes_ver) + core.log.info("checking waiting pool for resource: ", resource_path, + " current version: ", new_version, " requested version: ", resource_ver) + if resource_ver ~= new_version then + goto continue + end + + -- if a checker exists then delete it before creating a new one + local existing_checker = _M.working_pool[resource_path] + if existing_checker then + existing_checker.checker:delayed_clear(10) + existing_checker.checker:stop() + core.log.info("releasing existing checker: ", tostring(existing_checker.checker)) + end + local checker = create_checker(upstream) + if not checker then + goto continue + end + core.log.info("create new checker: ", tostring(checker)) + add_working_pool(resource_path, resource_ver, checker) + end + + ::continue:: + _M.waiting_pool[resource_path] = nil + end +end + + +function _M.timer_working_pool_check() + if core.table.nkeys(_M.working_pool) == 0 then + return + end + + local working_snapshot = tab_clone(_M.working_pool) + for resource_path, item in pairs(working_snapshot) do + --- remove from working pool if resource doesn't exist + local res_conf = fetch_latest_conf(resource_path) + if not res_conf then + item.checker:delayed_clear(10) Review Comment: The magic number 10 (seconds) for delayed_clear should be defined as a named constant to improve maintainability and make the cleanup timeout configurable. ```suggestion item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT) ``` ########## apisix/healthcheck_manager.lua: ########## @@ -0,0 +1,297 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local require = require +local ipairs = ipairs +local pcall = pcall +local exiting = ngx.worker.exiting +local pairs = pairs +local tostring = tostring +local core = require("apisix.core") +local config_local = require("apisix.core.config_local") +local healthcheck +local events = require("apisix.events") +local tab_clone = core.table.clone +local timer_every = ngx.timer.every +local _M = { + working_pool = {}, -- resource_path -> {version = ver, checker = checker} + waiting_pool = {} -- resource_path -> resource_ver +} +local healthcheck_shdict_name = "upstream-healthcheck" +local is_http = ngx.config.subsystem == "http" +if not is_http then + healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. ngx.config.subsystem +end + + +local function get_healthchecker_name(value) + return "upstream#" .. (value.resource_key or value.upstream.resource_key) +end +_M.get_healthchecker_name = get_healthchecker_name + + +local function fetch_latest_conf(resource_path) + local resource_type, id + -- Handle both formats: + -- 1. /apisix/<resource_type>/<id> + -- 2. /<resource_type>/<id> + if resource_path:find("^/apisix/") then + resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$") + else + resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$") + end + if not resource_type or not id then + core.log.error("invalid resource path: ", resource_path) + return nil + end + + local key + if resource_type == "upstreams" then + key = "/upstreams" + elseif resource_type == "routes" then + key = "/routes" + elseif resource_type == "services" then + key = "/services" + elseif resource_type == "stream_routes" then + key = "/stream_routes" + else + core.log.error("unsupported resource type: ", resource_type) + return nil + end + + local data = core.config.fetch_created_obj(key) + if not data then + core.log.error("failed to fetch configuration for type: ", key) + return nil + end + local resource = data:get(id) + if not resource then + -- this can happen if the resource was deleted + -- after the this function was called so we don't throw error + core.log.warn("resource not found: ", id, " in ", key) + return nil + end + + return resource +end + + +local function create_checker(up_conf) + local local_conf = config_local.local_conf() + if local_conf and local_conf.apisix and local_conf.apisix.disable_upstream_healthcheck then + core.log.info("healthchecker won't be created: disabled upstream healthcheck") + return nil + end + core.log.info("creating healthchecker for upstream: ", up_conf.resource_key) + if not healthcheck then + healthcheck = require("resty.healthcheck") + end + + local checker, err = healthcheck.new({ + name = get_healthchecker_name(up_conf), + shm_name = healthcheck_shdict_name, + checks = up_conf.checks, + events_module = events:get_healthcheck_events_modele(), + }) + + if not checker then + core.log.error("failed to create healthcheck: ", err) + return nil + end + + -- Add target nodes + local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host + local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port + local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host + local use_node_hdr = up_conf.pass_host == "node" or nil + + for _, node in ipairs(up_conf.nodes) do + local host_hdr = up_hdr or (use_node_hdr and node.domain) + local ok, err = checker:add_target(node.host, port or node.port, host, + true, host_hdr) + if not ok then + core.log.error("failed to add healthcheck target: ", node.host, ":", + port or node.port, " err: ", err) + end + end + + return checker +end + + +function _M.fetch_checker(resource_path, resource_ver) + local working_item = _M.working_pool[resource_path] + if working_item and working_item.version == resource_ver then + return working_item.checker + end + + if _M.waiting_pool[resource_path] == resource_ver then + return nil + end + + -- Add to waiting pool with version + core.log.info("adding ", resource_path, " to waiting pool with version: ", resource_ver) + _M.waiting_pool[resource_path] = resource_ver + return nil +end + + +function _M.fetch_node_status(checker, ip, port, hostname) + -- check if the checker is valid + if not checker or checker.dead then + return true + end + + return checker:get_target_status(ip, port, hostname) +end + + +local function add_working_pool(resource_path, resource_ver, checker) + _M.working_pool[resource_path] = { + version = resource_ver, + checker = checker + } +end + +local function find_in_working_pool(resource_path, resource_ver) + local checker = _M.working_pool[resource_path] + if not checker then + return nil -- not found + end + + if checker.version ~= resource_ver then + core.log.info("version mismatch for resource: ", resource_path, + " current version: ", checker.version, " requested version: ", resource_ver) + return nil -- version not match + end + return checker +end + + +function _M.upstream_version(index, nodes_ver) + if not index then + return + end + return index .. tostring(nodes_ver or '') +end + + +function _M.timer_create_checker() + if core.table.nkeys(_M.waiting_pool) == 0 then + return + end + + local waiting_snapshot = tab_clone(_M.waiting_pool) + for resource_path, resource_ver in pairs(waiting_snapshot) do + do + if find_in_working_pool(resource_path, resource_ver) then + core.log.info("resource: ", resource_path, + " already in working pool with version: ", + resource_ver) + goto continue + end + local res_conf = fetch_latest_conf(resource_path) + if not res_conf then + goto continue + end + local upstream = res_conf.value.upstream or res_conf.value + local new_version = _M.upstream_version(res_conf.modifiedIndex, upstream._nodes_ver) + core.log.info("checking waiting pool for resource: ", resource_path, + " current version: ", new_version, " requested version: ", resource_ver) + if resource_ver ~= new_version then + goto continue + end + + -- if a checker exists then delete it before creating a new one + local existing_checker = _M.working_pool[resource_path] + if existing_checker then + existing_checker.checker:delayed_clear(10) + existing_checker.checker:stop() + core.log.info("releasing existing checker: ", tostring(existing_checker.checker)) + end + local checker = create_checker(upstream) + if not checker then + goto continue + end + core.log.info("create new checker: ", tostring(checker)) + add_working_pool(resource_path, resource_ver, checker) + end + + ::continue:: + _M.waiting_pool[resource_path] = nil + end +end + + +function _M.timer_working_pool_check() + if core.table.nkeys(_M.working_pool) == 0 then + return + end + + local working_snapshot = tab_clone(_M.working_pool) + for resource_path, item in pairs(working_snapshot) do + --- remove from working pool if resource doesn't exist + local res_conf = fetch_latest_conf(resource_path) + if not res_conf then + item.checker:delayed_clear(10) + item.checker:stop() + core.log.info("try to release checker: ", tostring(item.checker)) + _M.working_pool[resource_path] = nil + goto continue + end Review Comment: The code assumes res_conf.value exists but doesn't validate it before accessing res_conf.value._nodes_ver on the next line, which could cause a nil access error. ```suggestion end if not res_conf.value then item.checker:delayed_clear(10) item.checker:stop() core.log.info("try to release checker due to missing value: ", tostring(item.checker)) _M.working_pool[resource_path] = nil goto continue end ``` ########## apisix/healthcheck_manager.lua: ########## @@ -0,0 +1,297 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local require = require +local ipairs = ipairs +local pcall = pcall +local exiting = ngx.worker.exiting +local pairs = pairs +local tostring = tostring +local core = require("apisix.core") +local config_local = require("apisix.core.config_local") +local healthcheck +local events = require("apisix.events") +local tab_clone = core.table.clone +local timer_every = ngx.timer.every +local _M = { + working_pool = {}, -- resource_path -> {version = ver, checker = checker} + waiting_pool = {} -- resource_path -> resource_ver +} +local healthcheck_shdict_name = "upstream-healthcheck" +local is_http = ngx.config.subsystem == "http" +if not is_http then + healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. ngx.config.subsystem +end + + +local function get_healthchecker_name(value) + return "upstream#" .. (value.resource_key or value.upstream.resource_key) +end +_M.get_healthchecker_name = get_healthchecker_name + + +local function fetch_latest_conf(resource_path) + local resource_type, id + -- Handle both formats: + -- 1. /apisix/<resource_type>/<id> + -- 2. /<resource_type>/<id> + if resource_path:find("^/apisix/") then + resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$") + else + resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$") + end + if not resource_type or not id then + core.log.error("invalid resource path: ", resource_path) + return nil + end + + local key + if resource_type == "upstreams" then + key = "/upstreams" + elseif resource_type == "routes" then + key = "/routes" + elseif resource_type == "services" then + key = "/services" + elseif resource_type == "stream_routes" then + key = "/stream_routes" + else + core.log.error("unsupported resource type: ", resource_type) + return nil + end + + local data = core.config.fetch_created_obj(key) + if not data then + core.log.error("failed to fetch configuration for type: ", key) + return nil + end + local resource = data:get(id) + if not resource then + -- this can happen if the resource was deleted + -- after the this function was called so we don't throw error + core.log.warn("resource not found: ", id, " in ", key) + return nil Review Comment: When a resource is not found, the function returns nil without providing context to the caller about whether this was due to a deleted resource or a configuration error. ```suggestion return nil, "unsupported resource type" end local data = core.config.fetch_created_obj(key) if not data then core.log.error("failed to fetch configuration for type: ", key) return nil, "failed to fetch configuration" end local resource = data:get(id) if not resource then -- this can happen if the resource was deleted -- after the this function was called so we don't throw error core.log.warn("resource not found: ", id, " in ", key) return nil, "resource not found" ``` ########## apisix/healthcheck_manager.lua: ########## @@ -0,0 +1,297 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local require = require +local ipairs = ipairs +local pcall = pcall +local exiting = ngx.worker.exiting +local pairs = pairs +local tostring = tostring +local core = require("apisix.core") +local config_local = require("apisix.core.config_local") +local healthcheck +local events = require("apisix.events") +local tab_clone = core.table.clone +local timer_every = ngx.timer.every +local _M = { + working_pool = {}, -- resource_path -> {version = ver, checker = checker} + waiting_pool = {} -- resource_path -> resource_ver +} +local healthcheck_shdict_name = "upstream-healthcheck" +local is_http = ngx.config.subsystem == "http" +if not is_http then + healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. ngx.config.subsystem +end + + +local function get_healthchecker_name(value) + return "upstream#" .. (value.resource_key or value.upstream.resource_key) +end +_M.get_healthchecker_name = get_healthchecker_name + + +local function fetch_latest_conf(resource_path) + local resource_type, id + -- Handle both formats: + -- 1. /apisix/<resource_type>/<id> + -- 2. /<resource_type>/<id> + if resource_path:find("^/apisix/") then + resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$") + else + resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$") + end + if not resource_type or not id then + core.log.error("invalid resource path: ", resource_path) + return nil + end + + local key + if resource_type == "upstreams" then + key = "/upstreams" + elseif resource_type == "routes" then + key = "/routes" + elseif resource_type == "services" then + key = "/services" + elseif resource_type == "stream_routes" then + key = "/stream_routes" + else + core.log.error("unsupported resource type: ", resource_type) + return nil + end + + local data = core.config.fetch_created_obj(key) + if not data then + core.log.error("failed to fetch configuration for type: ", key) + return nil + end + local resource = data:get(id) + if not resource then + -- this can happen if the resource was deleted + -- after the this function was called so we don't throw error + core.log.warn("resource not found: ", id, " in ", key) + return nil + end + + return resource +end + + +local function create_checker(up_conf) + local local_conf = config_local.local_conf() + if local_conf and local_conf.apisix and local_conf.apisix.disable_upstream_healthcheck then + core.log.info("healthchecker won't be created: disabled upstream healthcheck") + return nil + end + core.log.info("creating healthchecker for upstream: ", up_conf.resource_key) + if not healthcheck then + healthcheck = require("resty.healthcheck") + end + + local checker, err = healthcheck.new({ + name = get_healthchecker_name(up_conf), + shm_name = healthcheck_shdict_name, + checks = up_conf.checks, + events_module = events:get_healthcheck_events_modele(), + }) + + if not checker then + core.log.error("failed to create healthcheck: ", err) + return nil + end + + -- Add target nodes + local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host + local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port + local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host + local use_node_hdr = up_conf.pass_host == "node" or nil + + for _, node in ipairs(up_conf.nodes) do + local host_hdr = up_hdr or (use_node_hdr and node.domain) + local ok, err = checker:add_target(node.host, port or node.port, host, + true, host_hdr) + if not ok then + core.log.error("failed to add healthcheck target: ", node.host, ":", + port or node.port, " err: ", err) + end + end + + return checker +end + + +function _M.fetch_checker(resource_path, resource_ver) + local working_item = _M.working_pool[resource_path] + if working_item and working_item.version == resource_ver then + return working_item.checker + end + + if _M.waiting_pool[resource_path] == resource_ver then + return nil + end + + -- Add to waiting pool with version + core.log.info("adding ", resource_path, " to waiting pool with version: ", resource_ver) + _M.waiting_pool[resource_path] = resource_ver + return nil +end + + +function _M.fetch_node_status(checker, ip, port, hostname) + -- check if the checker is valid + if not checker or checker.dead then + return true + end + + return checker:get_target_status(ip, port, hostname) +end + + +local function add_working_pool(resource_path, resource_ver, checker) + _M.working_pool[resource_path] = { + version = resource_ver, + checker = checker + } +end + +local function find_in_working_pool(resource_path, resource_ver) + local checker = _M.working_pool[resource_path] + if not checker then + return nil -- not found + end + + if checker.version ~= resource_ver then + core.log.info("version mismatch for resource: ", resource_path, + " current version: ", checker.version, " requested version: ", resource_ver) + return nil -- version not match + end + return checker +end + + +function _M.upstream_version(index, nodes_ver) + if not index then + return + end + return index .. tostring(nodes_ver or '') +end + + +function _M.timer_create_checker() + if core.table.nkeys(_M.waiting_pool) == 0 then + return + end + + local waiting_snapshot = tab_clone(_M.waiting_pool) + for resource_path, resource_ver in pairs(waiting_snapshot) do + do + if find_in_working_pool(resource_path, resource_ver) then + core.log.info("resource: ", resource_path, + " already in working pool with version: ", + resource_ver) + goto continue + end + local res_conf = fetch_latest_conf(resource_path) + if not res_conf then + goto continue + end + local upstream = res_conf.value.upstream or res_conf.value + local new_version = _M.upstream_version(res_conf.modifiedIndex, upstream._nodes_ver) + core.log.info("checking waiting pool for resource: ", resource_path, + " current version: ", new_version, " requested version: ", resource_ver) + if resource_ver ~= new_version then + goto continue + end + + -- if a checker exists then delete it before creating a new one + local existing_checker = _M.working_pool[resource_path] + if existing_checker then + existing_checker.checker:delayed_clear(10) + existing_checker.checker:stop() + core.log.info("releasing existing checker: ", tostring(existing_checker.checker)) + end + local checker = create_checker(upstream) + if not checker then + goto continue + end + core.log.info("create new checker: ", tostring(checker)) + add_working_pool(resource_path, resource_ver, checker) + end + + ::continue:: + _M.waiting_pool[resource_path] = nil + end +end + + +function _M.timer_working_pool_check() + if core.table.nkeys(_M.working_pool) == 0 then + return + end + + local working_snapshot = tab_clone(_M.working_pool) + for resource_path, item in pairs(working_snapshot) do + --- remove from working pool if resource doesn't exist + local res_conf = fetch_latest_conf(resource_path) + if not res_conf then + item.checker:delayed_clear(10) + item.checker:stop() + core.log.info("try to release checker: ", tostring(item.checker)) + _M.working_pool[resource_path] = nil + goto continue + end + local current_ver = _M.upstream_version(res_conf.modifiedIndex, + res_conf.value._nodes_ver) + core.log.info("checking working pool for resource: ", resource_path, + " current version: ", current_ver, " item version: ", item.version) + if item.version ~= current_ver then + item.checker:delayed_clear(10) Review Comment: The magic number 10 (seconds) for delayed_clear should be defined as a named constant to improve maintainability and make the cleanup timeout configurable. ```suggestion item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org