This is an automated email from the ASF dual-hosted git repository.

nic-6443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5270be15e fix(ai-proxy-multi): refresh cached server picker once 
health checkers are created (#13505)
5270be15e is described below

commit 5270be15e79322fa3ed27518d31e0bf7acf9cc92
Author: Nic <[email protected]>
AuthorDate: Thu Jun 11 10:35:33 2026 +0800

    fix(ai-proxy-multi): refresh cached server picker once health checkers are 
created (#13505)
---
 apisix/plugins/ai-proxy-multi.lua                  |  26 +--
 t/plugin/ai-proxy-multi-healthcheck-stale-picker.t | 211 +++++++++++++++++++++
 2 files changed, 226 insertions(+), 11 deletions(-)

diff --git a/apisix/plugins/ai-proxy-multi.lua 
b/apisix/plugins/ai-proxy-multi.lua
index 0d23ad096..05cfc46ce 100644
--- a/apisix/plugins/ai-proxy-multi.lua
+++ b/apisix/plugins/ai-proxy-multi.lua
@@ -27,6 +27,7 @@ local exporter = require("apisix.plugins.prometheus.exporter")
 local tonumber = tonumber
 local pairs = pairs
 local table_sort = table.sort
+local table_concat = table.concat
 local math_random = math.random
 local ngx_now = ngx.now
 
@@ -330,12 +331,18 @@ local function resolve_endpoint(instance_conf)
 end
 
 
-local function get_checkers_status_ver(checkers)
-    local status_ver_total = 0
-    for _, checker in pairs(checkers) do
-        status_ver_total = status_ver_total + checker.status_ver
-    end
-    return status_ver_total
+local function get_checkers_status_ver(conf, checkers)
+    local parts = core.table.new(#conf.instances, 0)
+    for i, ins in ipairs(conf.instances) do
+        local checker = checkers[ins.name]
+        -- "x" distinguishes "checker not created yet" from a created checker
+        -- whose status_ver is still 0. Otherwise the server picker built
+        -- without health filtering before the checker exists would share the
+        -- same cache key with the post-creation state and be reused even
+        -- after the shm already marks some nodes unhealthy.
+        parts[i] = checker and checker.status_ver or "x"
+    end
+    return table_concat(parts, "-")
 end
 
 
@@ -441,11 +448,8 @@ local function pick_target(ctx, conf, ups_tab)
         end
     end
 
-    local version = plugin.conf_version(conf)
-    if checkers then
-        local status_ver = get_checkers_status_ver(checkers)
-        version = version .. "#" .. status_ver
-    end
+    local version = plugin.conf_version(conf) .. "#" ..
+                    get_checkers_status_ver(conf, checkers)
 
     local server_picker = ctx.server_picker
     if not server_picker then
diff --git a/t/plugin/ai-proxy-multi-healthcheck-stale-picker.t 
b/t/plugin/ai-proxy-multi-healthcheck-stale-picker.t
new file mode 100644
index 000000000..52195bffc
--- /dev/null
+++ b/t/plugin/ai-proxy-multi-healthcheck-stale-picker.t
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+log_level("info");
+repeat_each(1);
+no_long_string();
+no_shuffle();
+no_root_location();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    my $user_yaml_config = <<_EOC_;
+plugins:
+  - ai-proxy-multi
+_EOC_
+    $block->set_value("extra_yaml_config", $user_yaml_config);
+
+    my $http_config = $block->http_config // <<_EOC_;
+        server {
+            server_name openai;
+            listen 127.0.0.1:16724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    
ngx.say([[{"choices":[{"message":{"content":"ok","role":"assistant"}}]}]])
+                }
+            }
+
+            location /status {
+                content_by_lua_block {
+                    ngx.say("ok")
+                }
+            }
+        }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: server picker cached before checker creation must be refreshed 
after checker exists
+# Reproduce the cache key collision: the first request runs before the
+# healthcheck manager creates the per-instance checkers (they are created
+# asynchronously ~1s after the first fetch_checker call), so it builds and
+# caches a server picker WITHOUT health filtering. If the unhealthy state in
+# shm was produced by another worker before this worker's checkers were
+# created, the local checkers never receive a status-change event and their
+# status_ver stays 0. The picker cache key must still change once the
+# checkers exist, otherwise the unfiltered picker is reused indefinitely and
+# keeps routing requests to the unhealthy instance.
+# Here a "shadow" checker plays the other worker: it seeds the shared shm
+# with an unhealthy state for the dead instance before any traffic arrives.
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local json = require("cjson.safe")
+
+            local checks = {
+                active = {
+                    type = "http",
+                    timeout = 1,
+                    http_path = "/status",
+                    healthy = {
+                        interval = 30,
+                        successes = 1,
+                    },
+                    unhealthy = {
+                        interval = 30,
+                        http_failures = 2,
+                        tcp_failures = 2,
+                        timeouts = 2,
+                    },
+                },
+            }
+
+            local route = {
+                uri = "/ai",
+                plugins = {
+                    ["ai-proxy-multi"] = {
+                        fallback_strategy = 
"instance_health_and_rate_limiting",
+                        instances = {
+                            {
+                                name = "dead",
+                                provider = "openai",
+                                weight = 1,
+                                auth = {
+                                    header = {
+                                        Authorization = "Bearer token",
+                                    },
+                                },
+                                options = {
+                                    model = "gpt-4",
+                                },
+                                override = {
+                                    -- nothing listens on this port
+                                    endpoint = "http://127.0.0.1:16725";,
+                                },
+                                checks = checks,
+                            },
+                            {
+                                name = "ok",
+                                provider = "openai",
+                                weight = 1,
+                                auth = {
+                                    header = {
+                                        Authorization = "Bearer token",
+                                    },
+                                },
+                                options = {
+                                    model = "gpt-4",
+                                },
+                                override = {
+                                    endpoint = "http://127.0.0.1:16724";,
+                                },
+                                checks = checks,
+                            },
+                        },
+                        ssl_verify = false,
+                    },
+                },
+            }
+
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 json.encode(route)
+            )
+            assert(code < 300, body)
+
+            -- seed the shared shm as another worker would have: every target
+            -- already exists (so add_target in this worker raises no event)
+            -- and the dead instance is already marked unhealthy before this
+            -- worker's checker objects exist
+            local healthcheck = require("resty.healthcheck")
+            local targets = {
+                -- {instance index in json path, port, is_healthy}
+                {0, 16725, false},
+                {1, 16724, true},
+            }
+            for _, target in ipairs(targets) do
+                local shadow = healthcheck.new({
+                    name = 
"upstream#/apisix/routes/1#plugins['ai-proxy-multi'].instances["
+                           .. target[1] .. "]",
+                    shm_name = "upstream-healthcheck",
+                    events_module = "resty.events",
+                    checks = checks,
+                })
+                assert(shadow:add_target("127.0.0.1", target[2], nil, 
target[3]))
+                shadow:stop()
+            end
+
+            local function send_ai()
+                local code = t("/ai",
+                    ngx.HTTP_POST,
+                    json.encode({messages = {{role = "user", content = 
"hi"}}}),
+                    nil,
+                    {
+                        ["Content-Type"] = "application/json",
+                    }
+                )
+                return code
+            end
+
+            -- builds and caches the server picker while no checker exists
+            -- yet, and queues the checker creation
+            send_ai()
+
+            -- let the healthcheck manager timer create the checkers; no
+            -- status change happens afterwards, so no event ever bumps
+            -- status_ver in this worker
+            ngx.sleep(2)
+
+            -- the unhealthy instance must not be picked anymore
+            for i = 1, 4 do
+                local code = send_ai()
+                assert(code == 200, "request " .. i .. " got status " .. code
+                       .. ": unhealthy instance still picked")
+            end
+            ngx.say("passed")
+        }
+    }
+--- timeout: 10
+--- response_body
+passed

Reply via email to