This is an automated email from the ASF dual-hosted git repository.

nic-6443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new ec981038c fix: keep chash ring stable during health changes (#13532)
ec981038c is described below

commit ec981038cb97c679f8863835d6ff68094bf5ab77
Author: Nic <[email protected]>
AuthorDate: Tue Jun 16 21:18:52 2026 +0800

    fix: keep chash ring stable during health changes (#13532)
---
 apisix/balancer.lua                                | 115 ++++++++--
 apisix/healthcheck_manager.lua                     |  15 +-
 apisix/plugins/ai-proxy-multi.lua                  | 171 +++++++++++---
 t/node/chash-healthcheck-stable-ring.t             | 204 +++++++++++++++++
 .../ai-proxy-multi-chash-healthcheck-stable-ring.t | 252 +++++++++++++++++++++
 5 files changed, 705 insertions(+), 52 deletions(-)

diff --git a/apisix/balancer.lua b/apisix/balancer.lua
index 5b6b3b029..7a358dadf 100644
--- a/apisix/balancer.lua
+++ b/apisix/balancer.lua
@@ -34,6 +34,9 @@ local pickers = {}
 local lrucache_server_picker = core.lrucache.new({
     ttl = 300, count = 256
 })
+local lrucache_health_status = core.lrucache.new({
+    ttl = 300, count = 256
+})
 local lrucache_addr = core.lrucache.new({
     ttl = 300, count = 1024 * 4
 })
@@ -60,33 +63,63 @@ local function transform_node(new_nodes, node)
 end
 
 
-local function fetch_health_nodes(upstream, checker)
+local function fetch_all_nodes(upstream)
     local nodes = upstream.nodes
-    if not checker then
-        local new_nodes = core.table.new(0, #nodes)
-        for _, node in ipairs(nodes) do
-            new_nodes = transform_node(new_nodes, node)
-        end
-        return new_nodes
+    local new_nodes = core.table.new(0, #nodes)
+    for _, node in ipairs(nodes) do
+        new_nodes = transform_node(new_nodes, node)
     end
+    return new_nodes
+end
 
+
+local function create_health_status(upstream, checker)
+    local nodes = upstream.nodes
     local host = upstream.checks and upstream.checks.active and 
upstream.checks.active.host
     local port = upstream.checks and upstream.checks.active and 
upstream.checks.active.port
-    local up_nodes = core.table.new(0, #nodes)
+    local health_status = core.table.new(0, #nodes)
+    local has_healthy_node = false
+
     for _, node in ipairs(nodes) do
         local ok, err = healthcheck_manager.fetch_node_status(checker,
                                              node.host, port or node.port, 
host)
+        local addr = node.host .. ":" .. node.port
         if ok then
-            up_nodes = transform_node(up_nodes, node)
-        elseif err then
-            core.log.warn("failed to get health check target status, addr: ",
-                node.host, ":", port or node.port, ", host: ", host, ", err: 
", err)
+            health_status[addr] = true
+            has_healthy_node = true
+        else
+            health_status[addr] = false
+            if err then
+                core.log.warn("failed to get health check target status, addr: 
",
+                    node.host, ":", port or node.port, ", host: ", host, ", 
err: ", err)
+            end
         end
     end
 
-    if core.table.nkeys(up_nodes) == 0 then
+    if not has_healthy_node then
         core.log.warn("all upstream nodes is unhealthy, use default")
-        for _, node in ipairs(nodes) do
+        return {all_unhealthy = true}
+    end
+
+    return {status = health_status}
+end
+
+
+-- Build the picker node set from the healthy subset, reusing 
create_health_status
+-- so the per-node health lookup lives in exactly one place.
+local function fetch_health_nodes(upstream, checker)
+    if not checker then
+        return fetch_all_nodes(upstream)
+    end
+
+    local health_status = create_health_status(upstream, checker)
+    if health_status.all_unhealthy then
+        return fetch_all_nodes(upstream)
+    end
+
+    local up_nodes = core.table.new(0, #upstream.nodes)
+    for _, node in ipairs(upstream.nodes) do
+        if health_status.status[node.host .. ":" .. node.port] then
             up_nodes = transform_node(up_nodes, node)
         end
     end
@@ -95,6 +128,21 @@ local function fetch_health_nodes(upstream, checker)
 end
 
 
+local function fetch_health_status(upstream, checker, key, version)
+    if not checker then
+        return nil
+    end
+
+    local health_status = lrucache_health_status(key, version .. "#" .. 
checker.status_ver,
+                                                 create_health_status, 
upstream, checker)
+    if not health_status or health_status.all_unhealthy then
+        return nil
+    end
+
+    return health_status.status
+end
+
+
 local function create_server_picker(upstream, checker)
     local picker = pickers[upstream.type]
     if not picker then
@@ -112,7 +160,12 @@ local function create_server_picker(upstream, checker)
             end
         end
 
-        local up_nodes = fetch_health_nodes(upstream, checker)
+        local up_nodes
+        if upstream.type == "chash" then
+            up_nodes = fetch_all_nodes(upstream)
+        else
+            up_nodes = fetch_health_nodes(upstream, checker)
+        end
 
         if #up_nodes._priority_index > 1 then
             core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes))
@@ -229,7 +282,12 @@ local function pick_server(route, ctx)
         end
     end
 
-    if checker then
+    local health_status
+    if checker and up_conf.type == "chash" then
+        health_status = fetch_health_status(up_conf, checker, key, version)
+    end
+
+    if checker and up_conf.type ~= "chash" then
         version = version .. "#" .. checker.status_ver
     end
 
@@ -243,10 +301,29 @@ local function pick_server(route, ctx)
         return nil, "failed to fetch server picker"
     end
 
-    local server, err = server_picker.get(ctx)
+    local server, err
+    for _ = 1, nodes_count do
+        server, err = server_picker.get(ctx)
+        if not server then
+            err = err or "no valid upstream node"
+            return nil, "failed to find valid upstream server, " .. err
+        end
+
+        if not health_status or health_status[server] then
+            break
+        end
+
+        ctx.balancer_server = server
+        if not server_picker.after_balance then
+            return nil, "failed to skip unhealthy upstream server: 
after_balance is unavailable"
+        end
+
+        server_picker.after_balance(ctx, true)
+        server = nil
+    end
+
     if not server then
-        err = err or "no valid upstream node"
-        return nil, "failed to find valid upstream server, " .. err
+        return nil, "failed to find valid upstream server, all upstream 
servers tried"
     end
     ctx.balancer_server = server
 
diff --git a/apisix/healthcheck_manager.lua b/apisix/healthcheck_manager.lua
index 8133364ee..1ee49f64d 100644
--- a/apisix/healthcheck_manager.lua
+++ b/apisix/healthcheck_manager.lua
@@ -113,7 +113,20 @@ function _M.fetch_node_status(checker, ip, port, hostname)
         return true
     end
 
-    return checker:get_target_status(ip, port, hostname)
+    local ok, err = checker:get_target_status(ip, port, hostname)
+    if err == "target not found" then
+        -- get_target_status reads a worker-local cache that resty.healthcheck 
fills
+        -- asynchronously (add_target only raises an event), so right after a 
checker
+        -- is created a target can be missing from this worker's view even 
though it
+        -- is registered in the shm and being probed. Treat it as unknown 
(usable)
+        -- rather than unhealthy, but still log it: a target that stays 
missing means
+        -- the cache never converged, a real bug worth surfacing rather than 
swallowing.
+        core.log.warn("health check target status not available yet, treat as 
unknown",
+                      ", addr: ", ip, ":", port, ", host: ", hostname)
+        return true
+    end
+
+    return ok, err
 end
 
 
diff --git a/apisix/plugins/ai-proxy-multi.lua 
b/apisix/plugins/ai-proxy-multi.lua
index 13a4b8e3e..4ba204ecc 100644
--- a/apisix/plugins/ai-proxy-multi.lua
+++ b/apisix/plugins/ai-proxy-multi.lua
@@ -45,6 +45,9 @@ local pickers = {}
 local lrucache_server_picker = core.lrucache.new({
     ttl = 300, count = 256
 })
+local lrucache_health_status = core.lrucache.new({
+    ttl = 300, count = 256
+})
 
 local plugin_name = "ai-proxy-multi"
 local _M = {
@@ -346,26 +349,33 @@ local function get_checkers_status_ver(conf, checkers)
 end
 
 
-local function fetch_health_instances(conf, checkers)
+local function fetch_all_instances(conf)
     local instances = conf.instances
     local new_instances = core.table.new(0, #instances)
-    if not checkers then
-        for _, ins in ipairs(conf.instances) do
-            transform_instances(new_instances, ins)
-        end
-        return new_instances
+    for _, ins in ipairs(instances) do
+        transform_instances(new_instances, ins)
     end
 
+    return new_instances
+end
+
+
+local function create_health_status(conf, checkers)
+    local instances = conf.instances
+    local health_status = core.table.new(0, #instances)
+    local healthy_dns_nodes = core.table.new(0, #instances)
+    local has_healthy_instance = false
+
     for _, ins in ipairs(instances) do
         local checker = checkers[ins.name]
         if checker then
             local host = ins.checks and ins.checks.active and 
ins.checks.active.host
             local port = ins.checks and ins.checks.active and 
ins.checks.active.port
             local healthy_nodes = {}
-            ins._healthy_dns_nodes = nil
 
             for _, node in ipairs(ins._dns_nodes or {}) do
-                local ok, err = checker:get_target_status(node.host, port or 
node.port, host)
+                local ok, err = healthcheck_manager.fetch_node_status(checker,
+                                                     node.host, port or 
node.port, host)
                 if ok then
                     healthy_nodes[#healthy_nodes + 1] = node
                 elseif err then
@@ -375,18 +385,63 @@ local function fetch_health_instances(conf, checkers)
             end
 
             if #healthy_nodes > 0 then
-                ins._healthy_dns_nodes = healthy_nodes
-                transform_instances(new_instances, ins)
+                healthy_dns_nodes[ins.name] = healthy_nodes
+                health_status[ins.name] = true
+                has_healthy_instance = true
+            else
+                health_status[ins.name] = false
             end
         else
-            ins._healthy_dns_nodes = nil
-            transform_instances(new_instances, ins)
+            health_status[ins.name] = true
+            has_healthy_instance = true
         end
     end
 
-    if core.table.nkeys(new_instances) == 0 then
+    if not has_healthy_instance then
         core.log.warn("all upstream nodes is unhealthy, use default")
-        for _, ins in ipairs(instances) do
+        return {all_unhealthy = true}
+    end
+
+    return {
+        status = health_status,
+        healthy_dns_nodes = healthy_dns_nodes,
+    }
+end
+
+
+local function apply_health_status(conf, health_status)
+    if not health_status or health_status.all_unhealthy then
+        for _, ins in ipairs(conf.instances) do
+            ins._healthy_dns_nodes = nil
+        end
+
+        return nil
+    end
+
+    for _, ins in ipairs(conf.instances) do
+        ins._healthy_dns_nodes = health_status.healthy_dns_nodes[ins.name]
+    end
+
+    return health_status.status
+end
+
+
+-- Build the picker instance set from the healthy subset, reusing
+-- create_health_status/apply_health_status so the per-instance health lookup
+-- lives in exactly one place.
+local function fetch_health_instances(conf, checkers)
+    if not checkers then
+        return fetch_all_instances(conf)
+    end
+
+    local status = apply_health_status(conf, create_health_status(conf, 
checkers))
+    if not status then
+        return fetch_all_instances(conf)
+    end
+
+    local new_instances = core.table.new(0, #conf.instances)
+    for _, ins in ipairs(conf.instances) do
+        if status[ins.name] then
             transform_instances(new_instances, ins)
         end
     end
@@ -395,6 +450,29 @@ local function fetch_health_instances(conf, checkers)
 end
 
 
+local function get_health_status_ver(conf, checkers)
+    local parts = core.table.new(#conf.instances, 0)
+    for i, ins in ipairs(conf.instances) do
+        local checker = checkers[ins.name]
+        parts[i] = (ins._nodes_ver or 0) .. ":" .. (checker and 
checker.status_ver or "x")
+    end
+
+    return table_concat(parts, "-")
+end
+
+
+local function fetch_health_status(conf, checkers, key, version)
+    if not checkers then
+        return nil
+    end
+
+    local health_status = lrucache_health_status(key, version .. "#" ..
+                                                 get_health_status_ver(conf, 
checkers),
+                                                 create_health_status, conf, 
checkers)
+    return apply_health_status(conf, health_status)
+end
+
+
 local function create_server_picker(conf, ups_tab, checkers)
     local picker = pickers[conf.balancer.algorithm] -- nil check
     if not picker then
@@ -402,7 +480,12 @@ local function create_server_picker(conf, ups_tab, 
checkers)
         picker = pickers[conf.balancer.algorithm]
     end
 
-    local new_instances = fetch_health_instances(conf, checkers)
+    local new_instances
+    if conf.balancer.algorithm == "chash" then
+        new_instances = fetch_all_instances(conf)
+    else
+        new_instances = fetch_health_instances(conf, checkers)
+    end
     core.log.info("fetch health instances: ", 
core.json.delay_encode(new_instances))
 
     if #new_instances._priority_index > 1 then
@@ -448,8 +531,13 @@ local function pick_target(ctx, conf, ups_tab)
         end
     end
 
-    local version = plugin.conf_version(conf) .. "#" ..
-                    get_checkers_status_ver(conf, checkers)
+    local health_status
+    local version = plugin.conf_version(conf)
+    if conf.balancer.algorithm == "chash" then
+        health_status = fetch_health_status(conf, checkers, 
ctx.matched_route.key, version)
+    else
+        version = version .. "#" .. get_checkers_status_ver(conf, checkers)
+    end
 
     local server_picker = ctx.server_picker
     if not server_picker then
@@ -461,29 +549,48 @@ local function pick_target(ctx, conf, ups_tab)
     end
     ctx.server_picker = server_picker
 
-    local instance_name, err = server_picker.get(ctx)
-    if err then
-        return nil, nil, err
+    local ai_rate_limiting
+    local check_rate_limiting = conf.fallback_strategy == 
"instance_health_and_rate_limiting" or
+                                fallback_strategy_has(conf.fallback_strategy, 
"rate_limiting")
+    if check_rate_limiting then
+        ai_rate_limiting = require("apisix.plugins.ai-rate-limiting")
     end
-    ctx.balancer_server = instance_name
-    if conf.fallback_strategy == "instance_health_and_rate_limiting" or -- for 
backwards compatible
-       fallback_strategy_has(conf.fallback_strategy, "rate_limiting") then
-        local ai_rate_limiting = require("apisix.plugins.ai-rate-limiting")
-        for _ = 1, #conf.instances do
-            if ai_rate_limiting.check_instance_status(nil, ctx, instance_name) 
then
+
+    local instance_name, err
+    for _ = 1, #conf.instances do
+        instance_name, err = server_picker.get(ctx)
+        if err then
+            return nil, nil, err
+        end
+
+        if not health_status or health_status[instance_name] then
+            if not check_rate_limiting or
+               ai_rate_limiting.check_instance_status(nil, ctx, instance_name) 
then
                 break
             end
             core.log.warn("ai instance: ", instance_name,
                              " is not available, try to pick another one")
-            server_picker.after_balance(ctx, true)
-            instance_name, err = server_picker.get(ctx)
-            if err then
-                return nil, nil, err
-            end
-            ctx.balancer_server = instance_name
+
+        else
+            core.log.warn("ai instance: ", instance_name,
+                             " is unhealthy, try to pick another one")
         end
+
+        ctx.balancer_server = instance_name
+        if not server_picker.after_balance then
+            return nil, nil, "failed to skip AI instance: after_balance is 
unavailable"
+        end
+
+        server_picker.after_balance(ctx, true)
+        instance_name = nil
     end
 
+    if not instance_name then
+        return nil, nil, "all servers tried"
+    end
+
+    ctx.balancer_server = instance_name
+
     local instance_conf = get_instance_conf(conf.instances, instance_name)
     local nodes = instance_conf._healthy_dns_nodes or instance_conf._dns_nodes
     use_node_for_request(instance_conf, pick_request_node(nodes))
diff --git a/t/node/chash-healthcheck-stable-ring.t 
b/t/node/chash-healthcheck-stable-ring.t
new file mode 100644
index 000000000..49eca4301
--- /dev/null
+++ b/t/node/chash-healthcheck-stable-ring.t
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = <<_EOC_;
+        server {
+            listen 127.0.0.1:16730;
+
+            location /server_port {
+                content_by_lua_block {
+                    ngx.print("16730")
+                }
+            }
+
+            location /status {
+                return 500;
+            }
+        }
+
+        server {
+            listen 127.0.0.1:16731;
+
+            location /server_port {
+                content_by_lua_block {
+                    ngx.print("16731")
+                }
+            }
+
+            location /status {
+                return 200;
+            }
+        }
+
+        server {
+            listen 127.0.0.1:16732;
+
+            location /server_port {
+                content_by_lua_block {
+                    ngx.print("16732")
+                }
+            }
+
+            location /status {
+                return 200;
+            }
+        }
+_EOC_
+    $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: chash keeps healthy node mapping stable when health status changes
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local json = require("cjson.safe")
+            local http = require("resty.http")
+
+            local unhealthy_port = "16730"
+            local nodes = {
+                ["127.0.0.1:" .. unhealthy_port] = 3,
+                ["127.0.0.1:16731"] = 6,
+                ["127.0.0.1:16732"] = 10,
+            }
+
+            local function put_route(with_checks)
+                local upstream = {
+                    type = "chash",
+                    hash_on = "header",
+                    key = "X-Sessionid",
+                    nodes = nodes,
+                }
+
+                if with_checks then
+                    upstream.checks = {
+                        active = {
+                            http_path = "/status",
+                            healthy = {
+                                interval = 1,
+                                http_statuses = {200},
+                                successes = 1,
+                            },
+                            unhealthy = {
+                                interval = 1,
+                                http_statuses = {500},
+                                http_failures = 1,
+                                tcp_failures = 1,
+                                timeouts = 1,
+                            },
+                        },
+                    }
+                end
+
+                local code, body = t('/apisix/admin/routes/1',
+                    ngx.HTTP_PUT,
+                    json.encode({
+                        uri = "/server_port",
+                        upstream = upstream,
+                    })
+                )
+                assert(code < 300, body)
+
+                return upstream
+            end
+
+            local function send(session_id)
+                local httpc = http.new()
+                local res, err = httpc:request_uri(
+                    "http://127.0.0.1:"; .. ngx.var.server_port .. 
"/server_port",
+                    {
+                        method = "GET",
+                        keepalive = false,
+                        headers = {
+                            ["X-Sessionid"] = session_id,
+                        },
+                    }
+                )
+                assert(res, err)
+                assert(res.status == 200, res.status .. ": " .. res.body)
+                return res.body
+            end
+
+            put_route(false)
+
+            local baseline = {}
+            local baseline_unhealthy = 0
+            local healthy_total = 0
+            local total = 120
+            for i = 1, total do
+                local key = "session-" .. i
+                local port = send(key)
+                baseline[key] = port
+                if port == unhealthy_port then
+                    baseline_unhealthy = baseline_unhealthy + 1
+                else
+                    healthy_total = healthy_total + 1
+                end
+            end
+            assert(baseline_unhealthy > 0, "baseline did not hit unhealthy 
node")
+            assert(healthy_total > 0, "baseline did not hit healthy nodes")
+
+            put_route(true)
+            send("warmup")
+            ngx.sleep(3)
+
+            local healthy_moved = 0
+            local unhealthy_stayed = 0
+            for i = 1, total do
+                local key = "session-" .. i
+                local before = baseline[key]
+                local after = send(key)
+                if before == unhealthy_port then
+                    if after == unhealthy_port then
+                        unhealthy_stayed = unhealthy_stayed + 1
+                    end
+                elseif after ~= before then
+                    healthy_moved = healthy_moved + 1
+                end
+            end
+
+            assert(healthy_moved == 0,
+                   "healthy-node keys moved after health change: " .. 
healthy_moved)
+            assert(unhealthy_stayed == 0,
+                   "unhealthy-node keys still routed to unhealthy node: " .. 
unhealthy_stayed)
+
+            ngx.say("baseline_unhealthy=", baseline_unhealthy,
+                    ", healthy_total=", healthy_total,
+                    ", healthy_moved=", healthy_moved)
+        }
+    }
+--- request
+GET /t
+--- timeout: 30
+--- response_body eval
+qr/baseline_unhealthy=\d+, healthy_total=\d+, healthy_moved=0/
+--- no_error_log
+[error]
diff --git a/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t 
b/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t
new file mode 100644
index 000000000..147fda63b
--- /dev/null
+++ b/t/plugin/ai-proxy-multi-chash-healthcheck-stable-ring.t
@@ -0,0 +1,252 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_long_string();
+no_shuffle();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    my $user_yaml_config = <<_EOC_;
+plugins:
+  - ai-proxy-multi
+_EOC_
+    $block->set_value("extra_yaml_config", $user_yaml_config);
+
+    my $http_config = <<_EOC_;
+        server {
+            server_name gpu-a;
+            listen 127.0.0.1:16724;
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    
ngx.say([[{"choices":[{"message":{"content":"gpu-a","role":"assistant"}}]}]])
+                }
+            }
+
+            location /status {
+                content_by_lua_block {
+                    ngx.status = 500
+                    ngx.say("fail")
+                }
+            }
+        }
+
+        server {
+            server_name gpu-b;
+            listen 127.0.0.1:16725;
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    
ngx.say([[{"choices":[{"message":{"content":"gpu-b","role":"assistant"}}]}]])
+                }
+            }
+
+            location /status {
+                content_by_lua_block {
+                    ngx.say("ok")
+                }
+            }
+        }
+
+        server {
+            server_name gpu-c;
+            listen 127.0.0.1:16726;
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    
ngx.say([[{"choices":[{"message":{"content":"gpu-c","role":"assistant"}}]}]])
+                }
+            }
+
+            location /status {
+                content_by_lua_block {
+                    ngx.say("ok")
+                }
+            }
+        }
+_EOC_
+    $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: ai-proxy-multi chash keeps healthy instance mapping stable when 
health status changes
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local json = require("cjson.safe")
+            local http = require("resty.http")
+
+            local checks = {
+                active = {
+                    type = "http",
+                    timeout = 1,
+                    http_path = "/status",
+                    healthy = {
+                        interval = 1,
+                        http_statuses = {200},
+                        successes = 1,
+                    },
+                    unhealthy = {
+                        interval = 1,
+                        http_statuses = {500},
+                        http_failures = 1,
+                        tcp_failures = 1,
+                        timeouts = 1,
+                    },
+                },
+            }
+
+            local function instance(name, port, weight, with_checks)
+                local ins = {
+                    name = name,
+                    provider = "openai-compatible",
+                    weight = weight,
+                    auth = {
+                        header = {
+                            Authorization = "Bearer token",
+                        },
+                    },
+                    options = {
+                        model = name,
+                    },
+                    override = {
+                        endpoint = "http://127.0.0.1:"; .. port .. 
"/v1/chat/completions",
+                    },
+                }
+                if with_checks then
+                    ins.checks = checks
+                end
+                return ins
+            end
+
+            local function put_route(with_checks)
+                local route = {
+                    uri = "/ai",
+                    plugins = {
+                        ["ai-proxy-multi"] = {
+                            balancer = {
+                                algorithm = "chash",
+                                hash_on = "header",
+                                key = "X-Sessionid",
+                            },
+                            instances = {
+                                instance("gpu-a", 16724, 3, with_checks),
+                                instance("gpu-b", 16725, 6, with_checks),
+                                instance("gpu-c", 16726, 10, with_checks),
+                            },
+                            ssl_verify = false,
+                        },
+                    },
+                }
+
+                local code, body = t('/apisix/admin/routes/1',
+                    ngx.HTTP_PUT,
+                    json.encode(route)
+                )
+                assert(code < 300, body)
+            end
+
+            local function send(session_id)
+                local httpc = http.new()
+                local res, err = httpc:request_uri(
+                    "http://127.0.0.1:"; .. ngx.var.server_port .. "/ai",
+                    {
+                        method = "POST",
+                        body = json.encode({messages = {{role = "user", 
content = "hi"}}}),
+                        keepalive = false,
+                        headers = {
+                            ["Content-Type"] = "application/json",
+                            ["X-Sessionid"] = session_id,
+                        },
+                    }
+                )
+                assert(res, err)
+                assert(res.status == 200, res.status .. ": " .. res.body)
+                local body = assert(json.decode(res.body))
+                return body.choices[1].message.content
+            end
+
+            put_route(false)
+
+            local baseline = {}
+            local baseline_a = 0
+            local healthy_total = 0
+            local total = 120
+            for i = 1, total do
+                local key = "session-" .. i
+                local name = send(key)
+                baseline[key] = name
+                if name == "gpu-a" then
+                    baseline_a = baseline_a + 1
+                else
+                    healthy_total = healthy_total + 1
+                end
+            end
+            assert(baseline_a > 0, "baseline did not hit gpu-a")
+            assert(healthy_total > 0, "baseline did not hit healthy instances")
+
+            put_route(true)
+            send("warmup")
+            ngx.sleep(3)
+
+            local healthy_moved = 0
+            local unhealthy_stayed = 0
+            for i = 1, total do
+                local key = "session-" .. i
+                local before = baseline[key]
+                local after = send(key)
+                if before == "gpu-a" then
+                    if after == "gpu-a" then
+                        unhealthy_stayed = unhealthy_stayed + 1
+                    end
+                elseif after ~= before then
+                    healthy_moved = healthy_moved + 1
+                end
+            end
+
+            assert(healthy_moved == 0,
+                   "healthy-instance keys moved after health change: " .. 
healthy_moved)
+            assert(unhealthy_stayed == 0,
+                   "unhealthy-instance keys still routed to gpu-a: " .. 
unhealthy_stayed)
+
+            ngx.say("baseline_a=", baseline_a,
+                    ", healthy_total=", healthy_total,
+                    ", healthy_moved=", healthy_moved)
+        }
+    }
+--- timeout: 30
+--- response_body eval
+qr/baseline_a=\d+, healthy_total=\d+, healthy_moved=0/
+--- no_error_log
+[error]

Reply via email to