Copilot commented on code in PR #12426:
URL: https://github.com/apache/apisix/pull/12426#discussion_r2218523547


##########
apisix/healthcheck_manager.lua:
##########
@@ -0,0 +1,297 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require = require
+local ipairs   = ipairs
+local pcall   = pcall
+local exiting      = ngx.worker.exiting
+local pairs    = pairs
+local tostring = tostring
+local core = require("apisix.core")
+local config_local   = require("apisix.core.config_local")
+local healthcheck
+local events = require("apisix.events")
+local tab_clone = core.table.clone
+local timer_every = ngx.timer.every
+local _M = {
+    working_pool = {},     -- resource_path -> {version = ver, checker = 
checker}
+    waiting_pool = {}      -- resource_path -> resource_ver
+}
+local healthcheck_shdict_name = "upstream-healthcheck"
+local is_http = ngx.config.subsystem == "http"
+if not is_http then
+    healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. 
ngx.config.subsystem
+end
+
+
+local function get_healthchecker_name(value)
+    return "upstream#" .. (value.resource_key or value.upstream.resource_key)
+end
+_M.get_healthchecker_name = get_healthchecker_name
+
+
+local function fetch_latest_conf(resource_path)
+    local resource_type, id
+    -- Handle both formats:
+    -- 1. /apisix/<resource_type>/<id>
+    -- 2. /<resource_type>/<id>
+    if resource_path:find("^/apisix/") then
+        resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$")
+    else
+        resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$")
+    end
+    if not resource_type or not id then
+        core.log.error("invalid resource path: ", resource_path)
+        return nil
+    end
+
+    local key
+    if resource_type == "upstreams" then
+        key = "/upstreams"
+    elseif resource_type == "routes" then
+        key = "/routes"
+    elseif resource_type == "services" then
+        key = "/services"
+    elseif resource_type == "stream_routes" then
+        key = "/stream_routes"
+    else
+        core.log.error("unsupported resource type: ", resource_type)
+        return nil
+    end
+
+    local data = core.config.fetch_created_obj(key)
+    if not data then
+        core.log.error("failed to fetch configuration for type: ", key)
+        return nil
+    end
+    local resource = data:get(id)
+    if not resource then
+        -- this can happen if the resource was deleted
+        -- after the this function was called so we don't throw error
+        core.log.warn("resource not found: ", id, " in ", key)
+        return nil
+    end
+
+    return resource
+end
+
+
+local function create_checker(up_conf)
+    local local_conf = config_local.local_conf()
+    if local_conf and local_conf.apisix and 
local_conf.apisix.disable_upstream_healthcheck then
+        core.log.info("healthchecker won't be created: disabled upstream 
healthcheck")
+        return nil
+    end
+    core.log.info("creating healthchecker for upstream: ", 
up_conf.resource_key)
+    if not healthcheck then
+        healthcheck = require("resty.healthcheck")
+    end
+
+    local checker, err = healthcheck.new({
+        name = get_healthchecker_name(up_conf),
+        shm_name = healthcheck_shdict_name,
+        checks = up_conf.checks,
+        events_module = events:get_healthcheck_events_modele(),
+    })
+
+    if not checker then
+        core.log.error("failed to create healthcheck: ", err)
+        return nil
+    end
+
+    -- Add target nodes
+    local host = up_conf.checks and up_conf.checks.active and 
up_conf.checks.active.host
+    local port = up_conf.checks and up_conf.checks.active and 
up_conf.checks.active.port
+    local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host
+    local use_node_hdr = up_conf.pass_host == "node" or nil
+
+    for _, node in ipairs(up_conf.nodes) do
+        local host_hdr = up_hdr or (use_node_hdr and node.domain)
+        local ok, err = checker:add_target(node.host, port or node.port, host,
+                                        true, host_hdr)
+        if not ok then
+            core.log.error("failed to add healthcheck target: ", node.host, 
":",
+                          port or node.port, " err: ", err)
+        end
+    end
+
+    return checker
+end
+
+
+function _M.fetch_checker(resource_path, resource_ver)
+    local working_item = _M.working_pool[resource_path]
+    if working_item and working_item.version == resource_ver then
+        return working_item.checker
+    end
+
+    if _M.waiting_pool[resource_path] == resource_ver then
+        return nil
+    end
+
+    -- Add to waiting pool with version
+    core.log.info("adding ", resource_path, " to waiting pool with version: ", 
resource_ver)
+    _M.waiting_pool[resource_path] = resource_ver
+    return nil
+end
+
+
+function _M.fetch_node_status(checker, ip, port, hostname)
+    -- check if the checker is valid
+    if not checker or checker.dead then
+        return true
+    end
+
+    return checker:get_target_status(ip, port, hostname)
+end
+
+
+local function add_working_pool(resource_path, resource_ver, checker)
+    _M.working_pool[resource_path] = {
+        version = resource_ver,
+        checker = checker
+    }
+end
+
+local function find_in_working_pool(resource_path, resource_ver)
+    local checker = _M.working_pool[resource_path]
+    if not checker then
+        return nil  -- not found
+    end
+
+    if checker.version ~= resource_ver then
+        core.log.info("version mismatch for resource: ", resource_path,
+                    " current version: ", checker.version, " requested 
version: ", resource_ver)
+        return nil  -- version not match
+    end
+    return checker
+end
+
+
+function _M.upstream_version(index, nodes_ver)
+    if not index then
+        return
+    end
+    return index .. tostring(nodes_ver or '')
+end
+
+
+function _M.timer_create_checker()
+    if core.table.nkeys(_M.waiting_pool) == 0 then
+        return
+    end
+
+    local waiting_snapshot = tab_clone(_M.waiting_pool)
+    for resource_path, resource_ver in pairs(waiting_snapshot) do
+        do
+            if find_in_working_pool(resource_path, resource_ver) then
+                core.log.info("resource: ", resource_path,
+                             " already in working pool with version: ",
+                               resource_ver)
+                goto continue
+            end
+            local res_conf = fetch_latest_conf(resource_path)
+            if not res_conf then
+                goto continue
+            end
+            local upstream = res_conf.value.upstream or res_conf.value
+            local new_version = _M.upstream_version(res_conf.modifiedIndex, 
upstream._nodes_ver)
+            core.log.info("checking waiting pool for resource: ", 
resource_path,
+                    " current version: ", new_version, " requested version: ", 
resource_ver)
+            if resource_ver ~= new_version then
+                goto continue
+            end
+
+            -- if a checker exists then delete it before creating a new one
+            local existing_checker = _M.working_pool[resource_path]
+            if existing_checker then
+                existing_checker.checker:delayed_clear(10)

Review Comment:
   The magic number 10 (seconds) for delayed_clear should be defined as a named 
constant to improve maintainability and make the cleanup timeout configurable.
   ```suggestion
                   existing_checker.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT)
   ```



##########
apisix/healthcheck_manager.lua:
##########
@@ -0,0 +1,297 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require = require
+local ipairs   = ipairs
+local pcall   = pcall
+local exiting      = ngx.worker.exiting
+local pairs    = pairs
+local tostring = tostring
+local core = require("apisix.core")
+local config_local   = require("apisix.core.config_local")
+local healthcheck
+local events = require("apisix.events")
+local tab_clone = core.table.clone
+local timer_every = ngx.timer.every
+local _M = {
+    working_pool = {},     -- resource_path -> {version = ver, checker = 
checker}
+    waiting_pool = {}      -- resource_path -> resource_ver
+}
+local healthcheck_shdict_name = "upstream-healthcheck"
+local is_http = ngx.config.subsystem == "http"
+if not is_http then
+    healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. 
ngx.config.subsystem
+end
+
+
+local function get_healthchecker_name(value)
+    return "upstream#" .. (value.resource_key or value.upstream.resource_key)
+end
+_M.get_healthchecker_name = get_healthchecker_name
+
+
+local function fetch_latest_conf(resource_path)
+    local resource_type, id
+    -- Handle both formats:
+    -- 1. /apisix/<resource_type>/<id>
+    -- 2. /<resource_type>/<id>
+    if resource_path:find("^/apisix/") then
+        resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$")
+    else
+        resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$")
+    end
+    if not resource_type or not id then
+        core.log.error("invalid resource path: ", resource_path)
+        return nil
+    end
+
+    local key
+    if resource_type == "upstreams" then
+        key = "/upstreams"
+    elseif resource_type == "routes" then
+        key = "/routes"
+    elseif resource_type == "services" then
+        key = "/services"
+    elseif resource_type == "stream_routes" then
+        key = "/stream_routes"
+    else
+        core.log.error("unsupported resource type: ", resource_type)
+        return nil
+    end
+
+    local data = core.config.fetch_created_obj(key)
+    if not data then
+        core.log.error("failed to fetch configuration for type: ", key)
+        return nil
+    end
+    local resource = data:get(id)
+    if not resource then
+        -- this can happen if the resource was deleted
+        -- after the this function was called so we don't throw error
+        core.log.warn("resource not found: ", id, " in ", key)
+        return nil
+    end
+
+    return resource
+end
+
+
+local function create_checker(up_conf)
+    local local_conf = config_local.local_conf()
+    if local_conf and local_conf.apisix and 
local_conf.apisix.disable_upstream_healthcheck then
+        core.log.info("healthchecker won't be created: disabled upstream 
healthcheck")
+        return nil
+    end
+    core.log.info("creating healthchecker for upstream: ", 
up_conf.resource_key)
+    if not healthcheck then
+        healthcheck = require("resty.healthcheck")
+    end
+
+    local checker, err = healthcheck.new({
+        name = get_healthchecker_name(up_conf),
+        shm_name = healthcheck_shdict_name,
+        checks = up_conf.checks,
+        events_module = events:get_healthcheck_events_modele(),
+    })
+
+    if not checker then
+        core.log.error("failed to create healthcheck: ", err)
+        return nil
+    end
+
+    -- Add target nodes
+    local host = up_conf.checks and up_conf.checks.active and 
up_conf.checks.active.host
+    local port = up_conf.checks and up_conf.checks.active and 
up_conf.checks.active.port
+    local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host
+    local use_node_hdr = up_conf.pass_host == "node" or nil
+
+    for _, node in ipairs(up_conf.nodes) do
+        local host_hdr = up_hdr or (use_node_hdr and node.domain)
+        local ok, err = checker:add_target(node.host, port or node.port, host,
+                                        true, host_hdr)
+        if not ok then
+            core.log.error("failed to add healthcheck target: ", node.host, 
":",
+                          port or node.port, " err: ", err)
+        end
+    end
+
+    return checker
+end
+
+
+function _M.fetch_checker(resource_path, resource_ver)
+    local working_item = _M.working_pool[resource_path]
+    if working_item and working_item.version == resource_ver then
+        return working_item.checker
+    end
+
+    if _M.waiting_pool[resource_path] == resource_ver then
+        return nil
+    end
+
+    -- Add to waiting pool with version
+    core.log.info("adding ", resource_path, " to waiting pool with version: ", 
resource_ver)
+    _M.waiting_pool[resource_path] = resource_ver
+    return nil
+end
+
+
+function _M.fetch_node_status(checker, ip, port, hostname)
+    -- check if the checker is valid
+    if not checker or checker.dead then
+        return true
+    end
+
+    return checker:get_target_status(ip, port, hostname)
+end
+
+
+local function add_working_pool(resource_path, resource_ver, checker)
+    _M.working_pool[resource_path] = {
+        version = resource_ver,
+        checker = checker
+    }
+end
+
+local function find_in_working_pool(resource_path, resource_ver)
+    local checker = _M.working_pool[resource_path]
+    if not checker then
+        return nil  -- not found
+    end
+
+    if checker.version ~= resource_ver then
+        core.log.info("version mismatch for resource: ", resource_path,
+                    " current version: ", checker.version, " requested 
version: ", resource_ver)
+        return nil  -- version not match
+    end
+    return checker
+end
+
+
+function _M.upstream_version(index, nodes_ver)
+    if not index then
+        return
+    end
+    return index .. tostring(nodes_ver or '')
+end
+
+
+function _M.timer_create_checker()
+    if core.table.nkeys(_M.waiting_pool) == 0 then
+        return
+    end
+
+    local waiting_snapshot = tab_clone(_M.waiting_pool)
+    for resource_path, resource_ver in pairs(waiting_snapshot) do
+        do
+            if find_in_working_pool(resource_path, resource_ver) then
+                core.log.info("resource: ", resource_path,
+                             " already in working pool with version: ",
+                               resource_ver)
+                goto continue
+            end
+            local res_conf = fetch_latest_conf(resource_path)
+            if not res_conf then
+                goto continue
+            end
+            local upstream = res_conf.value.upstream or res_conf.value
+            local new_version = _M.upstream_version(res_conf.modifiedIndex, 
upstream._nodes_ver)
+            core.log.info("checking waiting pool for resource: ", 
resource_path,
+                    " current version: ", new_version, " requested version: ", 
resource_ver)
+            if resource_ver ~= new_version then
+                goto continue
+            end
+
+            -- if a checker exists then delete it before creating a new one
+            local existing_checker = _M.working_pool[resource_path]
+            if existing_checker then
+                existing_checker.checker:delayed_clear(10)
+                existing_checker.checker:stop()
+                core.log.info("releasing existing checker: ", 
tostring(existing_checker.checker))
+            end
+            local checker = create_checker(upstream)
+            if not checker then
+                goto continue
+            end
+            core.log.info("create new checker: ", tostring(checker))
+            add_working_pool(resource_path, resource_ver, checker)
+        end
+
+        ::continue::
+        _M.waiting_pool[resource_path] = nil
+    end
+end
+
+
+function _M.timer_working_pool_check()
+    if core.table.nkeys(_M.working_pool) == 0 then
+        return
+    end
+
+    local working_snapshot = tab_clone(_M.working_pool)
+    for resource_path, item in pairs(working_snapshot) do
+        --- remove from working pool if resource doesn't exist
+        local res_conf = fetch_latest_conf(resource_path)
+        if not res_conf then
+            item.checker:delayed_clear(10)

Review Comment:
   The magic number 10 (seconds) for delayed_clear should be defined as a named 
constant to improve maintainability and make the cleanup timeout configurable.
   ```suggestion
               item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT)
   ```



##########
apisix/healthcheck_manager.lua:
##########
@@ -0,0 +1,297 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require = require
+local ipairs   = ipairs
+local pcall   = pcall
+local exiting      = ngx.worker.exiting
+local pairs    = pairs
+local tostring = tostring
+local core = require("apisix.core")
+local config_local   = require("apisix.core.config_local")
+local healthcheck
+local events = require("apisix.events")
+local tab_clone = core.table.clone
+local timer_every = ngx.timer.every
+local _M = {
+    working_pool = {},     -- resource_path -> {version = ver, checker = 
checker}
+    waiting_pool = {}      -- resource_path -> resource_ver
+}
+local healthcheck_shdict_name = "upstream-healthcheck"
+local is_http = ngx.config.subsystem == "http"
+if not is_http then
+    healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. 
ngx.config.subsystem
+end
+
+
+local function get_healthchecker_name(value)
+    return "upstream#" .. (value.resource_key or value.upstream.resource_key)
+end
+_M.get_healthchecker_name = get_healthchecker_name
+
+
+local function fetch_latest_conf(resource_path)
+    local resource_type, id
+    -- Handle both formats:
+    -- 1. /apisix/<resource_type>/<id>
+    -- 2. /<resource_type>/<id>
+    if resource_path:find("^/apisix/") then
+        resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$")
+    else
+        resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$")
+    end
+    if not resource_type or not id then
+        core.log.error("invalid resource path: ", resource_path)
+        return nil
+    end
+
+    local key
+    if resource_type == "upstreams" then
+        key = "/upstreams"
+    elseif resource_type == "routes" then
+        key = "/routes"
+    elseif resource_type == "services" then
+        key = "/services"
+    elseif resource_type == "stream_routes" then
+        key = "/stream_routes"
+    else
+        core.log.error("unsupported resource type: ", resource_type)
+        return nil
+    end
+
+    local data = core.config.fetch_created_obj(key)
+    if not data then
+        core.log.error("failed to fetch configuration for type: ", key)
+        return nil
+    end
+    local resource = data:get(id)
+    if not resource then
+        -- this can happen if the resource was deleted
+        -- after the this function was called so we don't throw error
+        core.log.warn("resource not found: ", id, " in ", key)
+        return nil
+    end
+
+    return resource
+end
+
+
+local function create_checker(up_conf)
+    local local_conf = config_local.local_conf()
+    if local_conf and local_conf.apisix and 
local_conf.apisix.disable_upstream_healthcheck then
+        core.log.info("healthchecker won't be created: disabled upstream 
healthcheck")
+        return nil
+    end
+    core.log.info("creating healthchecker for upstream: ", 
up_conf.resource_key)
+    if not healthcheck then
+        healthcheck = require("resty.healthcheck")
+    end
+
+    local checker, err = healthcheck.new({
+        name = get_healthchecker_name(up_conf),
+        shm_name = healthcheck_shdict_name,
+        checks = up_conf.checks,
+        events_module = events:get_healthcheck_events_modele(),
+    })
+
+    if not checker then
+        core.log.error("failed to create healthcheck: ", err)
+        return nil
+    end
+
+    -- Add target nodes
+    local host = up_conf.checks and up_conf.checks.active and 
up_conf.checks.active.host
+    local port = up_conf.checks and up_conf.checks.active and 
up_conf.checks.active.port
+    local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host
+    local use_node_hdr = up_conf.pass_host == "node" or nil
+
+    for _, node in ipairs(up_conf.nodes) do
+        local host_hdr = up_hdr or (use_node_hdr and node.domain)
+        local ok, err = checker:add_target(node.host, port or node.port, host,
+                                        true, host_hdr)
+        if not ok then
+            core.log.error("failed to add healthcheck target: ", node.host, 
":",
+                          port or node.port, " err: ", err)
+        end
+    end
+
+    return checker
+end
+
+
+function _M.fetch_checker(resource_path, resource_ver)
+    local working_item = _M.working_pool[resource_path]
+    if working_item and working_item.version == resource_ver then
+        return working_item.checker
+    end
+
+    if _M.waiting_pool[resource_path] == resource_ver then
+        return nil
+    end
+
+    -- Add to waiting pool with version
+    core.log.info("adding ", resource_path, " to waiting pool with version: ", 
resource_ver)
+    _M.waiting_pool[resource_path] = resource_ver
+    return nil
+end
+
+
+function _M.fetch_node_status(checker, ip, port, hostname)
+    -- check if the checker is valid
+    if not checker or checker.dead then
+        return true
+    end
+
+    return checker:get_target_status(ip, port, hostname)
+end
+
+
+local function add_working_pool(resource_path, resource_ver, checker)
+    _M.working_pool[resource_path] = {
+        version = resource_ver,
+        checker = checker
+    }
+end
+
+local function find_in_working_pool(resource_path, resource_ver)
+    local checker = _M.working_pool[resource_path]
+    if not checker then
+        return nil  -- not found
+    end
+
+    if checker.version ~= resource_ver then
+        core.log.info("version mismatch for resource: ", resource_path,
+                    " current version: ", checker.version, " requested 
version: ", resource_ver)
+        return nil  -- version not match
+    end
+    return checker
+end
+
+
+function _M.upstream_version(index, nodes_ver)
+    if not index then
+        return
+    end
+    return index .. tostring(nodes_ver or '')
+end
+
+
+function _M.timer_create_checker()
+    if core.table.nkeys(_M.waiting_pool) == 0 then
+        return
+    end
+
+    local waiting_snapshot = tab_clone(_M.waiting_pool)
+    for resource_path, resource_ver in pairs(waiting_snapshot) do
+        do
+            if find_in_working_pool(resource_path, resource_ver) then
+                core.log.info("resource: ", resource_path,
+                             " already in working pool with version: ",
+                               resource_ver)
+                goto continue
+            end
+            local res_conf = fetch_latest_conf(resource_path)
+            if not res_conf then
+                goto continue
+            end
+            local upstream = res_conf.value.upstream or res_conf.value
+            local new_version = _M.upstream_version(res_conf.modifiedIndex, 
upstream._nodes_ver)
+            core.log.info("checking waiting pool for resource: ", 
resource_path,
+                    " current version: ", new_version, " requested version: ", 
resource_ver)
+            if resource_ver ~= new_version then
+                goto continue
+            end
+
+            -- if a checker exists then delete it before creating a new one
+            local existing_checker = _M.working_pool[resource_path]
+            if existing_checker then
+                existing_checker.checker:delayed_clear(10)
+                existing_checker.checker:stop()
+                core.log.info("releasing existing checker: ", 
tostring(existing_checker.checker))
+            end
+            local checker = create_checker(upstream)
+            if not checker then
+                goto continue
+            end
+            core.log.info("create new checker: ", tostring(checker))
+            add_working_pool(resource_path, resource_ver, checker)
+        end
+
+        ::continue::
+        _M.waiting_pool[resource_path] = nil
+    end
+end
+
+
+function _M.timer_working_pool_check()
+    if core.table.nkeys(_M.working_pool) == 0 then
+        return
+    end
+
+    local working_snapshot = tab_clone(_M.working_pool)
+    for resource_path, item in pairs(working_snapshot) do
+        --- remove from working pool if resource doesn't exist
+        local res_conf = fetch_latest_conf(resource_path)
+        if not res_conf then
+            item.checker:delayed_clear(10)
+            item.checker:stop()
+            core.log.info("try to release checker: ", tostring(item.checker))
+            _M.working_pool[resource_path] = nil
+            goto continue
+        end

Review Comment:
   The code assumes res_conf.value exists but doesn't validate it before 
accessing res_conf.value._nodes_ver on the next line, which could cause a nil 
access error.
   ```suggestion
           end
           if not res_conf.value then
               item.checker:delayed_clear(10)
               item.checker:stop()
               core.log.info("try to release checker due to missing value: ", 
tostring(item.checker))
               _M.working_pool[resource_path] = nil
               goto continue
           end
   ```



##########
apisix/healthcheck_manager.lua:
##########
@@ -0,0 +1,297 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require = require
+local ipairs   = ipairs
+local pcall   = pcall
+local exiting      = ngx.worker.exiting
+local pairs    = pairs
+local tostring = tostring
+local core = require("apisix.core")
+local config_local   = require("apisix.core.config_local")
+local healthcheck
+local events = require("apisix.events")
+local tab_clone = core.table.clone
+local timer_every = ngx.timer.every
+local _M = {
+    working_pool = {},     -- resource_path -> {version = ver, checker = 
checker}
+    waiting_pool = {}      -- resource_path -> resource_ver
+}
+local healthcheck_shdict_name = "upstream-healthcheck"
+local is_http = ngx.config.subsystem == "http"
+if not is_http then
+    healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. 
ngx.config.subsystem
+end
+
+
+local function get_healthchecker_name(value)
+    return "upstream#" .. (value.resource_key or value.upstream.resource_key)
+end
+_M.get_healthchecker_name = get_healthchecker_name
+
+
+local function fetch_latest_conf(resource_path)
+    local resource_type, id
+    -- Handle both formats:
+    -- 1. /apisix/<resource_type>/<id>
+    -- 2. /<resource_type>/<id>
+    if resource_path:find("^/apisix/") then
+        resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$")
+    else
+        resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$")
+    end
+    if not resource_type or not id then
+        core.log.error("invalid resource path: ", resource_path)
+        return nil
+    end
+
+    local key
+    if resource_type == "upstreams" then
+        key = "/upstreams"
+    elseif resource_type == "routes" then
+        key = "/routes"
+    elseif resource_type == "services" then
+        key = "/services"
+    elseif resource_type == "stream_routes" then
+        key = "/stream_routes"
+    else
+        core.log.error("unsupported resource type: ", resource_type)
+        return nil
+    end
+
+    local data = core.config.fetch_created_obj(key)
+    if not data then
+        core.log.error("failed to fetch configuration for type: ", key)
+        return nil
+    end
+    local resource = data:get(id)
+    if not resource then
+        -- this can happen if the resource was deleted
+        -- after the this function was called so we don't throw error
+        core.log.warn("resource not found: ", id, " in ", key)
+        return nil

Review Comment:
   When a resource is not found, the function returns nil without providing 
context to the caller about whether this was due to a deleted resource or a 
configuration error.
   ```suggestion
           return nil, "unsupported resource type"
       end
   
       local data = core.config.fetch_created_obj(key)
       if not data then
           core.log.error("failed to fetch configuration for type: ", key)
           return nil, "failed to fetch configuration"
       end
       local resource = data:get(id)
       if not resource then
           -- this can happen if the resource was deleted
           -- after the this function was called so we don't throw error
           core.log.warn("resource not found: ", id, " in ", key)
           return nil, "resource not found"
   ```



##########
apisix/healthcheck_manager.lua:
##########
@@ -0,0 +1,297 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require = require
+local ipairs   = ipairs
+local pcall   = pcall
+local exiting      = ngx.worker.exiting
+local pairs    = pairs
+local tostring = tostring
+local core = require("apisix.core")
+local config_local   = require("apisix.core.config_local")
+local healthcheck
+local events = require("apisix.events")
+local tab_clone = core.table.clone
+local timer_every = ngx.timer.every
+local _M = {
+    working_pool = {},     -- resource_path -> {version = ver, checker = 
checker}
+    waiting_pool = {}      -- resource_path -> resource_ver
+}
+local healthcheck_shdict_name = "upstream-healthcheck"
+local is_http = ngx.config.subsystem == "http"
+if not is_http then
+    healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. 
ngx.config.subsystem
+end
+
+
+local function get_healthchecker_name(value)
+    return "upstream#" .. (value.resource_key or value.upstream.resource_key)
+end
+_M.get_healthchecker_name = get_healthchecker_name
+
+
+local function fetch_latest_conf(resource_path)
+    local resource_type, id
+    -- Handle both formats:
+    -- 1. /apisix/<resource_type>/<id>
+    -- 2. /<resource_type>/<id>
+    if resource_path:find("^/apisix/") then
+        resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$")
+    else
+        resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$")
+    end
+    if not resource_type or not id then
+        core.log.error("invalid resource path: ", resource_path)
+        return nil
+    end
+
+    local key
+    if resource_type == "upstreams" then
+        key = "/upstreams"
+    elseif resource_type == "routes" then
+        key = "/routes"
+    elseif resource_type == "services" then
+        key = "/services"
+    elseif resource_type == "stream_routes" then
+        key = "/stream_routes"
+    else
+        core.log.error("unsupported resource type: ", resource_type)
+        return nil
+    end
+
+    local data = core.config.fetch_created_obj(key)
+    if not data then
+        core.log.error("failed to fetch configuration for type: ", key)
+        return nil
+    end
+    local resource = data:get(id)
+    if not resource then
+        -- this can happen if the resource was deleted
+        -- after the this function was called so we don't throw error
+        core.log.warn("resource not found: ", id, " in ", key)
+        return nil
+    end
+
+    return resource
+end
+
+
+local function create_checker(up_conf)
+    local local_conf = config_local.local_conf()
+    if local_conf and local_conf.apisix and 
local_conf.apisix.disable_upstream_healthcheck then
+        core.log.info("healthchecker won't be created: disabled upstream 
healthcheck")
+        return nil
+    end
+    core.log.info("creating healthchecker for upstream: ", 
up_conf.resource_key)
+    if not healthcheck then
+        healthcheck = require("resty.healthcheck")
+    end
+
+    local checker, err = healthcheck.new({
+        name = get_healthchecker_name(up_conf),
+        shm_name = healthcheck_shdict_name,
+        checks = up_conf.checks,
+        events_module = events:get_healthcheck_events_modele(),
+    })
+
+    if not checker then
+        core.log.error("failed to create healthcheck: ", err)
+        return nil
+    end
+
+    -- Add target nodes
+    local host = up_conf.checks and up_conf.checks.active and 
up_conf.checks.active.host
+    local port = up_conf.checks and up_conf.checks.active and 
up_conf.checks.active.port
+    local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host
+    local use_node_hdr = up_conf.pass_host == "node" or nil
+
+    for _, node in ipairs(up_conf.nodes) do
+        local host_hdr = up_hdr or (use_node_hdr and node.domain)
+        local ok, err = checker:add_target(node.host, port or node.port, host,
+                                        true, host_hdr)
+        if not ok then
+            core.log.error("failed to add healthcheck target: ", node.host, 
":",
+                          port or node.port, " err: ", err)
+        end
+    end
+
+    return checker
+end
+
+
+function _M.fetch_checker(resource_path, resource_ver)
+    local working_item = _M.working_pool[resource_path]
+    if working_item and working_item.version == resource_ver then
+        return working_item.checker
+    end
+
+    if _M.waiting_pool[resource_path] == resource_ver then
+        return nil
+    end
+
+    -- Add to waiting pool with version
+    core.log.info("adding ", resource_path, " to waiting pool with version: ", 
resource_ver)
+    _M.waiting_pool[resource_path] = resource_ver
+    return nil
+end
+
+
+function _M.fetch_node_status(checker, ip, port, hostname)
+    -- check if the checker is valid
+    if not checker or checker.dead then
+        return true
+    end
+
+    return checker:get_target_status(ip, port, hostname)
+end
+
+
+local function add_working_pool(resource_path, resource_ver, checker)
+    _M.working_pool[resource_path] = {
+        version = resource_ver,
+        checker = checker
+    }
+end
+
+local function find_in_working_pool(resource_path, resource_ver)
+    local checker = _M.working_pool[resource_path]
+    if not checker then
+        return nil  -- not found
+    end
+
+    if checker.version ~= resource_ver then
+        core.log.info("version mismatch for resource: ", resource_path,
+                    " current version: ", checker.version, " requested 
version: ", resource_ver)
+        return nil  -- version not match
+    end
+    return checker
+end
+
+
+function _M.upstream_version(index, nodes_ver)
+    if not index then
+        return
+    end
+    return index .. tostring(nodes_ver or '')
+end
+
+
+function _M.timer_create_checker()
+    if core.table.nkeys(_M.waiting_pool) == 0 then
+        return
+    end
+
+    local waiting_snapshot = tab_clone(_M.waiting_pool)
+    for resource_path, resource_ver in pairs(waiting_snapshot) do
+        do
+            if find_in_working_pool(resource_path, resource_ver) then
+                core.log.info("resource: ", resource_path,
+                             " already in working pool with version: ",
+                               resource_ver)
+                goto continue
+            end
+            local res_conf = fetch_latest_conf(resource_path)
+            if not res_conf then
+                goto continue
+            end
+            local upstream = res_conf.value.upstream or res_conf.value
+            local new_version = _M.upstream_version(res_conf.modifiedIndex, 
upstream._nodes_ver)
+            core.log.info("checking waiting pool for resource: ", 
resource_path,
+                    " current version: ", new_version, " requested version: ", 
resource_ver)
+            if resource_ver ~= new_version then
+                goto continue
+            end
+
+            -- if a checker exists then delete it before creating a new one
+            local existing_checker = _M.working_pool[resource_path]
+            if existing_checker then
+                existing_checker.checker:delayed_clear(10)
+                existing_checker.checker:stop()
+                core.log.info("releasing existing checker: ", 
tostring(existing_checker.checker))
+            end
+            local checker = create_checker(upstream)
+            if not checker then
+                goto continue
+            end
+            core.log.info("create new checker: ", tostring(checker))
+            add_working_pool(resource_path, resource_ver, checker)
+        end
+
+        ::continue::
+        _M.waiting_pool[resource_path] = nil
+    end
+end
+
+
+function _M.timer_working_pool_check()
+    if core.table.nkeys(_M.working_pool) == 0 then
+        return
+    end
+
+    local working_snapshot = tab_clone(_M.working_pool)
+    for resource_path, item in pairs(working_snapshot) do
+        --- remove from working pool if resource doesn't exist
+        local res_conf = fetch_latest_conf(resource_path)
+        if not res_conf then
+            item.checker:delayed_clear(10)
+            item.checker:stop()
+            core.log.info("try to release checker: ", tostring(item.checker))
+            _M.working_pool[resource_path] = nil
+            goto continue
+        end
+        local current_ver = _M.upstream_version(res_conf.modifiedIndex,
+                                                res_conf.value._nodes_ver)
+        core.log.info("checking working pool for resource: ", resource_path,
+                    " current version: ", current_ver, " item version: ", 
item.version)
+        if item.version ~= current_ver then
+            item.checker:delayed_clear(10)

Review Comment:
   The magic number 10 (seconds) for delayed_clear should be defined as a named 
constant to improve maintainability and make the cleanup timeout configurable.
   ```suggestion
               item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to