This is an automated email from the ASF dual-hosted git repository.

nic-6443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 86ac31202 fix(kafka-logger): support api_version so brokers can store 
message timestamps (#13521)
86ac31202 is described below

commit 86ac3120282889da9870718a454b6c60b3ddf2f6
Author: Nic <[email protected]>
AuthorDate: Fri Jun 12 10:48:58 2026 +0800

    fix(kafka-logger): support api_version so brokers can store message 
timestamps (#13521)
---
 apisix/plugins/kafka-logger.lua        |   8 +++
 docs/en/latest/plugins/kafka-logger.md |   1 +
 docs/zh/latest/plugins/kafka-logger.md |   1 +
 t/plugin/kafka-logger.t                | 111 +++++++++++++++++++++++++++++++++
 4 files changed, 121 insertions(+)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 318d21e92..014898c5e 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -128,6 +128,13 @@ local schema = {
         producer_max_buffering = {type = "integer", minimum = 1, default = 
50000},
         producer_time_linger = {type = "integer", minimum = 1, default = 1},
         meta_refresh_interval = {type = "integer", minimum = 1, default = 30},
+        -- send message with the Produce API version, only version 2 carries
+        -- the message timestamp, so that brokers can store it
+        api_version = {
+            type = "integer",
+            default = 1,
+            enum = {0, 1, 2},
+        },
     },
     oneOf = {
         { required = {"broker_list", "kafka_topic"},},
@@ -267,6 +274,7 @@ function _M.log(conf, ctx)
     broker_config["max_buffering"] = conf.producer_max_buffering
     broker_config["flush_time"] = conf.producer_time_linger * 1000
     broker_config["refresh_interval"] = conf.meta_refresh_interval * 1000
+    broker_config["api_version"] = conf.api_version
 
     local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, 
create_producer,
                                                broker_list, broker_config, 
conf.cluster_name)
diff --git a/docs/en/latest/plugins/kafka-logger.md 
b/docs/en/latest/plugins/kafka-logger.md
index f7d208fa5..defd99397 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -69,6 +69,7 @@ It might take some time to receive the log data. It will be 
automatically sent a
 | producer_max_buffering           | integer | False    | 50000          | 
[1,...]                                           | `max_buffering` parameter 
in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 
representing maximum buffer size. Unit is message count.                        
                                                                                
                                                                                
   |
 | producer_time_linger             | integer | False    | 1              | 
[1,...]                                           | `flush_time` parameter in 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds.    
                                                                                
                                                                                
                                                                      |
 | meta_refresh_interval            | integer | False    | 30             | 
[1,...]                                           | `refresh_interval` 
parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 
that specifies the interval to auto-refresh the metadata, in seconds.           
                                                                                
                                                                               |
+| api_version                      | integer | False    | 1              | [0, 
1, 2]                                         | `api_version` parameter in 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) that specifies 
the version of the Kafka Produce API. Set to `2` to make brokers store message 
timestamps; otherwise the timestamps are stored as `-1` (shown as `1970-01-01` 
in some consumers). Requires Kafka 0.10 or later.                       |
 
 This Plugin supports using batch processors to aggregate and process entries 
(logs/data) in a batch. This avoids the need for frequently submitting the 
data. The batch processor submits data every `5` seconds or when the data in 
the queue reaches `1000`. See [Batch 
Processor](../batch-processor.md#configuration) for more information or setting 
your custom configuration.
 
diff --git a/docs/zh/latest/plugins/kafka-logger.md 
b/docs/zh/latest/plugins/kafka-logger.md
index 3b3623356..b67fa3710 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -69,6 +69,7 @@ description: kafka-logger 插件将请求和响应日志作为 JSON 对象批量
 | producer_max_buffering           | integer | 否       | 50000           | 
[1,...]                                           | 对应 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 
`max_buffering` 参数,表示最大缓冲区大小,单位为条。                                              
                                                                                
                 |
 | producer_time_linger             | integer | 否       | 1               | 
[1,...]                                           | 对应 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 
`flush_time` 参数,单位为秒。                                                           
                                                                                
                           |
 | meta_refresh_interval            | integer | 否       | 30              | 
[1,...]                                           | 对应 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 
`refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。                             
                                                                                
             |
+| api_version                      | integer | 否       | 1               | [0, 
1, 2]                                         | 对应 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 
`api_version` 参数,用于指定 Kafka Produce API 的版本。设置为 `2` 时 broker 才会存储消息的时间戳,否则时间戳为 
`-1`(部分消费端会显示为 `1970-01-01`)。需要 Kafka 0.10 及以上版本。               |
 
 该插件支持使用批处理器来聚合并批量处理条目(日志/数据),避免频繁提交数据。默认情况下,批处理器每 `5` 秒或队列中的数据达到 `1000` 
条时提交数据。如需了解批处理器相关参数设置,请参考[批处理器](../batch-processor.md#配置)配置部分。
 
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 7d8481917..6e355ba81 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -875,3 +875,114 @@ passed
 qr/creating new batch processor with config.*/
 --- grep_error_log_out eval
 qr/creating new batch processor with config.*/
+
+
+
+=== TEST 28: check api_version schema: 2 is accepted, 3 is rejected
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.kafka-logger")
+            local ok, err = plugin.check_schema({
+                broker_list = {
+                    ["127.0.0.1"] = 9092
+                },
+                kafka_topic = "test",
+                api_version = 2
+            })
+            if not ok then
+                ngx.say(err)
+            end
+
+            local ok, err = plugin.check_schema({
+                broker_list = {
+                    ["127.0.0.1"] = 9092
+                },
+                kafka_topic = "test",
+                api_version = 3
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+property "api_version" validation failed: matches none of the enum values
+done
+
+
+
+=== TEST 29: report log to kafka with api_version = 2, the broker should store 
the message timestamp
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1":9092
+                                },
+                                "kafka_topic" : "test2",
+                                "producer_type": "sync",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "api_version": 2
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+            ngx.sleep(0.5)
+
+            -- record the current end offset of the topic before sending the 
log
+            local bconsumer = require("resty.kafka.basic-consumer")
+            local pconsumer = require("resty.kafka.protocol.consumer")
+            local broker_list = {{host = "127.0.0.1", port = 9092}}
+            local consumer = bconsumer:new(broker_list, {})
+            local offset, err = consumer:list_offset("test2", 0,
+                                                     
pconsumer.LIST_OFFSET_TIMESTAMP_LAST)
+            if not offset then
+                ngx.say("failed to list offset: ", err)
+                return
+            end
+            offset = tonumber(tostring(offset):match("^%-?%d+"))
+
+            -- hit the route to send the log to kafka
+            t('/hello', ngx.HTTP_GET)
+            ngx.sleep(2)
+
+            local data, err = consumer:fetch("test2", 0, offset)
+            if not data then
+                ngx.say("failed to fetch message: ", err)
+                return
+            end
+            local message = data.records[1]
+            if not message then
+                ngx.say("no message fetched")
+                return
+            end
+            if tonumber(message.timestamp) > 0 then
+                ngx.say("message timestamp is stored")
+            else
+                ngx.say("invalid message timestamp: ", 
tostring(message.timestamp))
+            end
+        }
+    }
+--- timeout: 10
+--- response_body
+message timestamp is stored

Reply via email to