This is an automated email from the ASF dual-hosted git repository.
nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new a5d06b326 fix: batch processor cache not working when configure plugin
in service (#12474)
a5d06b326 is described below
commit a5d06b3267f0da0b98ef6d685a35fe1cc403c710
Author: Nic <[email protected]>
AuthorDate: Thu Jul 31 09:12:23 2025 +0800
fix: batch processor cache not working when configure plugin in service
(#12474)
Signed-off-by: Nic <[email protected]>
---
apisix/utils/batch-processor-manager.lua | 5 +-
apisix/utils/batch-processor.lua | 3 ++
t/plugin/kafka-logger.t | 80 ++++++++++++++++++++++++++++++++
3 files changed, 86 insertions(+), 2 deletions(-)
diff --git a/apisix/utils/batch-processor-manager.lua
b/apisix/utils/batch-processor-manager.lua
index 4e97bd617..a36492b56 100644
--- a/apisix/utils/batch-processor-manager.lua
+++ b/apisix/utils/batch-processor-manager.lua
@@ -15,6 +15,7 @@
-- limitations under the License.
--
local core = require("apisix.core")
+local plugin = require("apisix.plugin")
local batch_processor = require("apisix.utils.batch-processor")
local timer_at = ngx.timer.at
local pairs = pairs
@@ -107,7 +108,7 @@ function _M:add_entry(conf, entry, max_pending_entries)
end
check_stale(self)
- local log_buffer = self.buffers[conf]
+ local log_buffer = self.buffers[plugin.conf_version(conf)]
if not log_buffer then
return false
end
@@ -149,7 +150,7 @@ function _M:add_entry_to_new_processor(conf, entry, ctx,
func, max_pending_entri
end
log_buffer:push(entry)
- self.buffers[conf] = log_buffer
+ self.buffers[plugin.conf_version(conf)] = log_buffer
self.total_pushed_entries = self.total_pushed_entries + 1
return true
end
diff --git a/apisix/utils/batch-processor.lua b/apisix/utils/batch-processor.lua
index eabee4f9e..eb63f2a26 100644
--- a/apisix/utils/batch-processor.lua
+++ b/apisix/utils/batch-processor.lua
@@ -157,6 +157,9 @@ function batch_processor:new(func, config)
return nil, "Invalid argument, arg #1 must be a function"
end
+ core.log.debug("creating new batch processor with config: ",
+ core.json.delay_encode(config, true))
+
local processor = {
func = func,
buffer_duration = config.buffer_duration,
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 4ffef3ba1..90003e20c 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -760,3 +760,83 @@ GET /hello
--- response_body
hello world
--- wait: 2
+
+
+
+=== TEST 26: create a service with kafka-logger and three routes bound to it
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/services/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": true,
+ "meta_format": "origin"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ }
+ }]]
+ )
+ if code >= 300 then
+ ngx.say("create service failed")
+ return
+ end
+ for i = 1, 3 do
+ local code, body = t('/apisix/admin/routes/' .. i,
+ ngx.HTTP_PUT,
+ string.format([[{
+ "uri": "/hello%d",
+ "service_id": "1"
+ }]], i)
+ )
+ if code >= 300 then
+ ngx.say("create route failed")
+ return
+ end
+ end
+ ngx.say("passed")
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 27: hit three routes, should create batch processor only once
+--- log_level: debug
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ for i = 1, 3 do
+ local resp = httpc:request_uri("http://127.0.0.1:" ..
ngx.var.server_port .. "/hello" .. i)
+ if not resp then
+ ngx.say("failed to request test server")
+ return
+ end
+ end
+ ngx.say("passed")
+ }
+ }
+--- response_body
+passed
+--- grep_error_log eval
+qr/creating new batch processor with config.*/
+--- grep_error_log_out eval
+qr/creating new batch processor with config.*/