This is an automated email from the ASF dual-hosted git repository.

nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new a5d06b326 fix: batch processor cache not working when configure plugin 
in service (#12474)
a5d06b326 is described below

commit a5d06b3267f0da0b98ef6d685a35fe1cc403c710
Author: Nic <[email protected]>
AuthorDate: Thu Jul 31 09:12:23 2025 +0800

    fix: batch processor cache not working when configure plugin in service 
(#12474)
    
    Signed-off-by: Nic <[email protected]>
---
 apisix/utils/batch-processor-manager.lua |  5 +-
 apisix/utils/batch-processor.lua         |  3 ++
 t/plugin/kafka-logger.t                  | 80 ++++++++++++++++++++++++++++++++
 3 files changed, 86 insertions(+), 2 deletions(-)

diff --git a/apisix/utils/batch-processor-manager.lua 
b/apisix/utils/batch-processor-manager.lua
index 4e97bd617..a36492b56 100644
--- a/apisix/utils/batch-processor-manager.lua
+++ b/apisix/utils/batch-processor-manager.lua
@@ -15,6 +15,7 @@
 -- limitations under the License.
 --
 local core = require("apisix.core")
+local plugin = require("apisix.plugin")
 local batch_processor = require("apisix.utils.batch-processor")
 local timer_at = ngx.timer.at
 local pairs = pairs
@@ -107,7 +108,7 @@ function _M:add_entry(conf, entry, max_pending_entries)
     end
     check_stale(self)
 
-    local log_buffer = self.buffers[conf]
+    local log_buffer = self.buffers[plugin.conf_version(conf)]
     if not log_buffer then
         return false
     end
@@ -149,7 +150,7 @@ function _M:add_entry_to_new_processor(conf, entry, ctx, 
func, max_pending_entri
     end
 
     log_buffer:push(entry)
-    self.buffers[conf] = log_buffer
+    self.buffers[plugin.conf_version(conf)] = log_buffer
     self.total_pushed_entries = self.total_pushed_entries + 1
     return true
 end
diff --git a/apisix/utils/batch-processor.lua b/apisix/utils/batch-processor.lua
index eabee4f9e..eb63f2a26 100644
--- a/apisix/utils/batch-processor.lua
+++ b/apisix/utils/batch-processor.lua
@@ -157,6 +157,9 @@ function batch_processor:new(func, config)
         return nil, "Invalid argument, arg #1 must be a function"
     end
 
+    core.log.debug("creating new batch processor with config: ",
+        core.json.delay_encode(config, true))
+
     local processor = {
         func = func,
         buffer_duration = config.buffer_duration,
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 4ffef3ba1..90003e20c 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -760,3 +760,83 @@ GET /hello
 --- response_body
 hello world
 --- wait: 2
+
+
+
+=== TEST 26: create a service with kafka-logger and three routes bound to it
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/services/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1":9092
+                                },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": true,
+                                "meta_format": "origin"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        }
+                }]]
+                )
+            if code >= 300 then
+                ngx.say("create service failed")
+                return
+            end
+            for i = 1, 3 do
+                local code, body = t('/apisix/admin/routes/' .. i,
+                     ngx.HTTP_PUT,
+                     string.format([[{
+                        "uri": "/hello%d",
+                        "service_id": "1"
+                     }]], i)
+                    )
+                if code >= 300 then
+                    ngx.say("create route failed")
+                    return
+                end
+            end
+            ngx.say("passed")
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 27: hit three routes, should create batch processor only once
+--- log_level: debug
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local httpc = http.new()
+            for i = 1, 3 do
+                local resp = httpc:request_uri("http://127.0.0.1:"; .. 
ngx.var.server_port .. "/hello" .. i)
+                if not resp then
+                    ngx.say("failed to request test server")
+                    return
+                end
+            end
+            ngx.say("passed")
+        }
+    }
+--- response_body
+passed
+--- grep_error_log eval
+qr/creating new batch processor with config.*/
+--- grep_error_log_out eval
+qr/creating new batch processor with config.*/

Reply via email to