This is an automated email from the ASF dual-hosted git repository.

ashishtiwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new e2e6829cd feat: replace events library with shdict (#12353)
e2e6829cd is described below

commit e2e6829cd4a75c56daf5526abe03da4689fecac3
Author: Ashish Tiwari <ashishjaitiwari15112...@gmail.com>
AuthorDate: Thu Jun 26 13:57:41 2025 +0530

    feat: replace events library with shdict (#12353)
---
 apisix/cli/ngx_tpl.lua          |   1 +
 apisix/discovery/nacos/init.lua | 118 +++++++------------
 t/APISIX.pm                     |   1 +
 t/discovery/nacos.t             | 243 +++++++++++++++++++++++++++++++---------
 t/discovery/nacos2.t            |  53 ++++++---
 5 files changed, 275 insertions(+), 141 deletions(-)

diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 0e95097c5..5dd739b62 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -73,6 +73,7 @@ lua {
     {% if status then %}
     lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *};
     {% end %}
+    lua_shared_dict nacos 10m;
 }
 
 {% if enabled_stream_plugins["prometheus"] and not enable_http then %}
diff --git a/apisix/discovery/nacos/init.lua b/apisix/discovery/nacos/init.lua
index 86e9d4125..d4fec7977 100644
--- a/apisix/discovery/nacos/init.lua
+++ b/apisix/discovery/nacos/init.lua
@@ -20,6 +20,7 @@ local local_conf         = 
require('apisix.core.config_local').local_conf()
 local http               = require('resty.http')
 local core               = require('apisix.core')
 local ipairs             = ipairs
+local pairs              = pairs
 local type               = type
 local math               = math
 local math_random        = math.random
@@ -34,7 +35,11 @@ local str_find           = core.string.find
 local log                = core.log
 
 local default_weight
-local applications
+local nacos_dict = ngx.shared.nacos --key: namespace_id.group_name.service_name
+if not nacos_dict then
+    error("lua_shared_dict \"nacos\" not configured")
+end
+
 local auth_path = 'auth/login'
 local instance_list_path = 'ns/instance/list?healthyOnly=true&serviceName='
 local default_namespace_id = "public"
@@ -42,20 +47,13 @@ local default_group_name = "DEFAULT_GROUP"
 local access_key
 local secret_key
 
-local events
-local events_list
-
 
 local _M = {}
 
-local function discovery_nacos_callback(data, event, source, pid)
-    applications = data
-    log.notice("update local variable application, event is: ", event,
-               "source: ", source, "server pid:", pid,
-               ", application: ", core.json.encode(applications, true))
+local function get_key(namespace_id, group_name, service_name)
+    return namespace_id .. '.' .. group_name .. '.' .. service_name
 end
 
-
 local function request(request_uri, path, body, method, basic_auth)
     local url = request_uri .. path
     log.info('request url:', url)
@@ -278,29 +276,24 @@ local function is_grpc(scheme)
     return false
 end
 
-
+local curr_service_in_use = {}
 local function fetch_full_registry(premature)
     if premature then
         return
     end
 
-    local up_apps = {}
     local base_uri, username, password = get_base_uri()
     local token_param, err = get_token_param(base_uri, username, password)
     if err then
         log.error('get_token_param error:', err)
-        if not applications then
-            applications = up_apps
-        end
         return
     end
 
     local infos = get_nacos_services()
     if #infos == 0 then
-        applications = up_apps
         return
     end
-
+    local service_names = {}
     for _, service_info in ipairs(infos) do
         local data, err
         local namespace_id = service_info.namespace_id
@@ -318,29 +311,15 @@ local function fetch_full_registry(premature)
             goto CONTINUE
         end
 
-        if not up_apps[namespace_id] then
-            up_apps[namespace_id] = {}
-        end
-
-        if not up_apps[namespace_id][group_name] then
-            up_apps[namespace_id][group_name] = {}
-        end
-
+        local nodes = {}
+        local key = get_key(namespace_id, group_name, 
service_info.service_name)
+        service_names[key] = true
         for _, host in ipairs(data.hosts) do
-            local nodes = up_apps[namespace_id]
-                [group_name][service_info.service_name]
-            if not nodes then
-                nodes = {}
-                up_apps[namespace_id]
-                    [group_name][service_info.service_name] = nodes
-            end
-
             local node = {
                 host = host.ip,
                 port = host.port,
                 weight = host.weight or default_weight,
             }
-
             -- docs: 
https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
             if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port 
then
                 node.port = host.metadata.gRPC_port
@@ -348,21 +327,19 @@ local function fetch_full_registry(premature)
 
             core.table.insert(nodes, node)
         end
-
+        if #nodes > 0 then
+            local content = core.json.encode(nodes)
+            nacos_dict:set(key, content)
+        end
         ::CONTINUE::
     end
-    local new_apps_md5sum = ngx.md5(core.json.encode(up_apps))
-    local old_apps_md5sum = ngx.md5(core.json.encode(applications))
-    if new_apps_md5sum == old_apps_md5sum then
-        return
-    end
-    applications = up_apps
-    local ok, err = events:post(events_list._source, events_list.updating,
-                                applications)
-    if not ok then
-        log.error("post_event failure with ", events_list._source,
-                  ", update application error: ", err)
+    -- remove services that are not in use anymore
+    for key, _ in pairs(curr_service_in_use) do
+        if not service_names[key] then
+            nacos_dict:delete(key)
+        end
     end
+    curr_service_in_use = service_names
 end
 
 
@@ -371,40 +348,18 @@ function _M.nodes(service_name, discovery_args)
             discovery_args.namespace_id or default_namespace_id
     local group_name = discovery_args
             and discovery_args.group_name or default_group_name
-
-    local logged = false
-    -- maximum waiting time: 5 seconds
-    local waiting_time = 5
-    local step = 0.1
-    while not applications and waiting_time > 0 do
-        if not logged then
-            log.warn('wait init')
-            logged = true
-        end
-        ngx.sleep(step)
-        waiting_time = waiting_time - step
-    end
-
-    if not applications or not applications[namespace_id]
-        or not applications[namespace_id][group_name]
-    then
+    local key = get_key(namespace_id, group_name, service_name)
+    local value = nacos_dict:get(key)
+    if not value then
+        core.log.error("nacos service not found: ", service_name)
         return nil
     end
-    return applications[namespace_id][group_name][service_name]
+    local nodes = core.json.decode(value)
+    return nodes
 end
 
 
 function _M.init_worker()
-    events = require("apisix.events")
-    events_list = events:event_list("discovery_nacos_update_application",
-                                    "updating")
-
-    if 0 ~= ngx.worker.id() then
-        events:register(discovery_nacos_callback, events_list._source,
-                        events_list.updating)
-        return
-    end
-
     default_weight = local_conf.discovery.nacos.weight
     log.info('default_weight:', default_weight)
     local fetch_interval = local_conf.discovery.nacos.fetch_interval
@@ -417,7 +372,20 @@ end
 
 
 function _M.dump_data()
-    return {config = local_conf.discovery.nacos, services = applications or {}}
+    local keys = nacos_dict:get_keys(0)
+    local applications = {}
+    for _, key in ipairs(keys) do
+        local value = nacos_dict:get(key)
+        if value then
+            local nodes = core.json.decode(value)
+            if nodes then
+                applications[key] = {
+                    nodes = nodes,
+                }
+            end
+        end
+    end
+    return {services = applications or {}}
 end
 
 
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 0ec20a1d9..d8480c47d 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -292,6 +292,7 @@ lua {
     lua_shared_dict prometheus-metrics 15m;
     lua_shared_dict standalone-config 10m;
     lua_shared_dict status-report 1m;
+    lua_shared_dict nacos 10m;
 }
 _EOC_
     }
diff --git a/t/discovery/nacos.t b/t/discovery/nacos.t
index 9af1ee14a..f2ebee57e 100644
--- a/t/discovery/nacos.t
+++ b/t/discovery/nacos.t
@@ -65,6 +65,12 @@ discovery:
 
 _EOC_
 
+add_block_preprocessor(sub {
+    my ($block) = @_;
+    $block->set_value("timeout", "10");
+
+});
+
 run_tests();
 
 __DATA__
@@ -127,18 +133,43 @@ routes:
       type: roundrobin
 
 #END
---- pipelined_requests eval
-[
-    "GET /hello",
-    "GET /hello",
-]
---- response_body_like eval
-[
-    qr/server [1-2]/,
-    qr/server [1-2]/,
-]
---- no_error_log
-[error, error]
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+
+            -- Wait for 2 seconds for APISIX initialization
+            ngx.sleep(2)
+            local httpc = http.new()
+            local valid_responses = 0
+
+            for i = 1, 2 do
+                local res, err = httpc:request_uri(uri .. "/hello")
+                if not res then
+                    ngx.log(ngx.ERR, "Request failed: ", err)
+                else
+                    -- Clean and validate response
+                    local clean_body = res.body:gsub("%s+$", "")
+                    if clean_body == "server 1" or clean_body == "server 2" 
then
+                        valid_responses = valid_responses + 1
+                    else
+                        ngx.log(ngx.ERR, "Invalid response: ", clean_body)
+                    end
+                end
+            end
+            -- Final check
+            if valid_responses == 2 then
+                ngx.say("PASS")
+            else
+                ngx.say("FAIL: only ", valid_responses, " valid responses")
+            end
+        }
+    }
+--- request
+GET /t
+--- response_body
+PASS
 
 
 
@@ -245,16 +276,43 @@ discovery:
       host:
         - "http://127.0.0.1:8858";
       fetch_interval: 1
---- pipelined_requests eval
-[
-    "GET /hello",
-    "GET /hello",
-]
---- response_body_like eval
-[
-    qr/server [1-2]/,
-    qr/server [1-2]/,
-]
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+
+            -- Wait for 2 seconds for APISIX initialization
+            ngx.sleep(2)
+            local httpc = http.new()
+            local valid_responses = 0
+
+            for i = 1, 2 do
+                local res, err = httpc:request_uri(uri .. "/hello")
+                if not res then
+                    ngx.log(ngx.ERR, "Request failed: ", err)
+                else
+                    -- Clean and validate response
+                    local clean_body = res.body:gsub("%s+$", "")
+                    if clean_body == "server 1" or clean_body == "server 2" 
then
+                        valid_responses = valid_responses + 1
+                    else
+                        ngx.log(ngx.ERR, "Invalid response: ", clean_body)
+                    end
+                end
+            end
+            -- Final check
+            if valid_responses == 2 then
+                ngx.say("PASS")
+            else
+                ngx.say("FAIL: only ", valid_responses, " valid responses")
+            end
+        }
+    }
+--- request
+GET /t
+--- response_body
+PASS
 
 
 
@@ -393,16 +451,43 @@ discovery:
       host:
         - "http://127.0.0.1:8858";
       fetch_interval: 1
---- pipelined_requests eval
-[
-    "GET /hello",
-    "GET /hello",
-]
---- response_body_like eval
-[
-    qr/server [1-2]/,
-    qr/server [1-2]/,
-]
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+
+            -- Wait for 2 seconds for APISIX initialization
+            ngx.sleep(2)
+            local httpc = http.new()
+            local valid_responses = 0
+
+            for i = 1, 2 do
+                local res, err = httpc:request_uri(uri .. "/hello")
+                if not res then
+                    ngx.log(ngx.ERR, "Request failed: ", err)
+                else
+                    -- Clean and validate response
+                    local clean_body = res.body:gsub("%s+$", "")
+                    if clean_body == "server 1" or clean_body == "server 2" 
then
+                        valid_responses = valid_responses + 1
+                    else
+                        ngx.log(ngx.ERR, "Invalid response: ", clean_body)
+                    end
+                end
+            end
+            -- Final check
+            if valid_responses == 2 then
+                ngx.say("PASS")
+            else
+                ngx.say("FAIL: only ", valid_responses, " valid responses")
+            end
+        }
+    }
+--- request
+GET /t
+--- response_body
+PASS
 
 
 
@@ -541,16 +626,43 @@ discovery:
       host:
         - "http://127.0.0.1:8858";
       fetch_interval: 1
---- pipelined_requests eval
-[
-    "GET /hello",
-    "GET /hello",
-]
---- response_body_like eval
-[
-    qr/server [1-2]/,
-    qr/server [1-2]/,
-]
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+
+            -- Wait for 2 seconds for APISIX initialization
+            ngx.sleep(2)
+            local httpc = http.new()
+            local valid_responses = 0
+
+            for i = 1, 2 do
+                local res, err = httpc:request_uri(uri .. "/hello")
+                if not res then
+                    ngx.log(ngx.ERR, "Request failed: ", err)
+                else
+                    -- Clean and validate response
+                    local clean_body = res.body:gsub("%s+$", "")
+                    if clean_body == "server 1" or clean_body == "server 2" 
then
+                        valid_responses = valid_responses + 1
+                    else
+                        ngx.log(ngx.ERR, "Invalid response: ", clean_body)
+                    end
+                end
+            end
+            -- Final check
+            if valid_responses == 2 then
+                ngx.say("PASS")
+            else
+                ngx.say("FAIL: only ", valid_responses, " valid responses")
+            end
+        }
+    }
+--- request
+GET /t
+--- response_body
+PASS
 
 
 
@@ -737,16 +849,43 @@ discovery:
       host:
         - "http://127.0.0.1:8858";
       fetch_interval: 1
---- pipelined_requests eval
-[
-    "GET /hello",
-    "GET /hello",
-]
---- response_body_like eval
-[
-    qr/server [1-2]/,
-    qr/server [1-2]/,
-]
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+
+            -- Wait for 2 seconds for APISIX initialization
+            ngx.sleep(2)
+            local httpc = http.new()
+            local valid_responses = 0
+
+            for i = 1, 2 do
+                local res, err = httpc:request_uri(uri .. "/hello")
+                if not res then
+                    ngx.log(ngx.ERR, "Request failed: ", err)
+                else
+                    -- Clean and validate response
+                    local clean_body = res.body:gsub("%s+$", "")
+                    if clean_body == "server 1" or clean_body == "server 2" 
then
+                        valid_responses = valid_responses + 1
+                    else
+                        ngx.log(ngx.ERR, "Invalid response: ", clean_body)
+                    end
+                end
+            end
+            -- Final check
+            if valid_responses == 2 then
+                ngx.say("PASS")
+            else
+                ngx.say("FAIL: only ", valid_responses, " valid responses")
+            end
+        }
+    }
+--- request
+GET /t
+--- response_body
+PASS
 
 
 
diff --git a/t/discovery/nacos2.t b/t/discovery/nacos2.t
index e7dc8f973..e45d22c43 100644
--- a/t/discovery/nacos2.t
+++ b/t/discovery/nacos2.t
@@ -148,18 +148,43 @@ routes:
       type: roundrobin
 
 #END
---- pipelined_requests eval
-[
-    "GET /hello",
-    "GET /hello",
-]
---- response_body_like eval
-[
-    qr/server [1-2]/,
-    qr/server [1-2]/,
-]
---- no_error_log
-[error, error]
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+
+            -- Wait for 2 seconds for APISIX initialization
+            ngx.sleep(2)
+            local httpc = http.new()
+            local valid_responses = 0
+
+            for i = 1, 2 do
+                local res, err = httpc:request_uri(uri .. "/hello")
+                if not res then
+                    ngx.log(ngx.ERR, "Request failed: ", err)
+                else
+                    -- Clean and validate response
+                    local clean_body = res.body:gsub("%s+$", "")
+                    if clean_body == "server 1" or clean_body == "server 2" 
then
+                        valid_responses = valid_responses + 1
+                    else
+                        ngx.log(ngx.ERR, "Invalid response: ", clean_body)
+                    end
+                end
+            end
+            -- Final check
+            if valid_responses == 2 then
+                ngx.say("PASS")
+            else
+                ngx.say("FAIL: only ", valid_responses, " valid responses")
+            end
+        }
+    }
+--- request
+GET /t
+--- response_body
+PASS
 
 
 
@@ -308,8 +333,8 @@ discovery:
 
             local body = json_decode(res.body)
             local services = body.services
-            local service = services["public"]["DEFAULT_GROUP"]["APISIX-NACOS"]
-            local number = table.getn(service)
+            local service = services["public.DEFAULT_GROUP.APISIX-NACOS"]
+            local number = table.getn(service.nodes)
             ngx.say(number)
         }
     }

Reply via email to