This is an automated email from the ASF dual-hosted git repository.
Yilialinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 5024b9a49 docs: update kafka-logger plugin documentation (#13278)
5024b9a49 is described below
commit 5024b9a492b63f0447c62954868037105c9fb4cb
Author: Yilia Lin <[email protected]>
AuthorDate: Thu Apr 30 11:58:40 2026 +0800
docs: update kafka-logger plugin documentation (#13278)
---
docs/en/latest/plugins/kafka-logger.md | 428 ++++++++++++++++++++------
docs/zh/latest/plugins/kafka-logger.md | 544 +++++++++++++++++++++++----------
2 files changed, 721 insertions(+), 251 deletions(-)
diff --git a/docs/en/latest/plugins/kafka-logger.md
b/docs/en/latest/plugins/kafka-logger.md
index eeaf3f2f0..f7d208fa5 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -5,7 +5,7 @@ keywords:
- API Gateway
- Plugin
- Kafka Logger
-description: This document contains information about the Apache APISIX
kafka-logger Plugin.
+description: The kafka-logger Plugin pushes request and response logs as JSON
objects to Apache Kafka clusters in batches, allowing for customizable log
formats to enhance data management.
---
<!--
@@ -27,44 +27,48 @@ description: This document contains information about the
Apache APISIX kafka-lo
#
-->
+<head>
+ <link rel="canonical" href="https://docs.api7.ai/hub/kafka-logger" />
+</head>
+
## Description
-The `kafka-logger` Plugin is used to push logs as JSON objects to Apache Kafka
clusters. It works as a Kafka client driver for the ngx_lua Nginx module.
+The `kafka-logger` Plugin pushes request and response logs as JSON objects to
Apache Kafka clusters in batches and supports the customization of log formats.
It might take some time to receive the log data. It will be automatically sent
after the timer function in the [batch processor](../batch-processor.md)
expires.
## Attributes
-| Name | Type | Required | Default | Valid values
| Description
|
-| ---------------------- | ------- | -------- | -------------- |
--------------------- |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
-| broker_list | object | True | |
| Deprecated, use `brokers` instead. List of Kafka brokers. (nodes).
|
-| brokers | array | True | |
| List of Kafka brokers (nodes).
|
-| brokers.host | string | True | |
| The host of Kafka broker, e.g, `192.168.1.1`.
|
-| brokers.port | integer | True | | [0, 65535]
| The port of Kafka broker
|
-| brokers.sasl_config | object | False | |
| The sasl config of Kafka broker
|
-| brokers.sasl_config.mechanism | string | False | "PLAIN" |
["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | The mechaism of
sasl config
|
-| brokers.sasl_config.user | string | True | |
| The user of sasl_config. If sasl_config exists, it's
required.
|
-| brokers.sasl_config.password | string | True | |
| The password of sasl_config. If sasl_config exists, it's
required.
|
-| kafka_topic | string | True | |
| Target topic to push the logs for organisation.
|
-| producer_type | string | False | async | ["async",
"sync"] | Message sending mode of the producer.
|
-| required_acks | integer | False | 1 | [1, -1]
| Number of acknowledgements the leader needs to receive for the producer
to consider the request complete. This controls the durability of the sent
records. The attribute follows the same configuration as the Kafka `acks`
attribute. `required_acks` cannot be 0. See [Apache Kafka
documentation](https://kafka.apache.org/documentation/#producerconfigs_acks)
for more. |
-| key | string | False | |
| Key used for allocating partitions for messages.
|
-| timeout | integer | False | 3 | [1,...]
| Timeout for the upstream to send data.
|
-| name | string | False | "kafka logger" |
| Unique identifier for the batch processor. If you use Prometheus to
monitor APISIX metrics, the name is exported in `apisix_batch_process_entries`.
[...]
-| meta_format | enum | False | "default" |
["default","origin"] | Format to collect the request information. Setting to
`default` collects the information in JSON format and `origin` collects the
information with the original HTTP request. See
[examples](#meta_format-example) below.
|
-| log_format | object | False | | | Log format declared as
key-value pairs in JSON. Values support strings and nested objects (up to five
levels deep; deeper fields are truncated). Within strings,
[APISIX](../apisix-variable.md) or
[NGINX](http://nginx.org/en/docs/varindex.html) variables can be referenced by
prefixing with `$`. |
-| include_req_body | boolean | False | false | [false, true]
| When set to `true` includes the request body in the log. If the
request body is too big to be kept in the memory, it can't be logged due to
Nginx's limitations.
|
-| include_req_body_expr | array | False | |
| Filter for when the `include_req_body` attribute is set to `true`.
Request body is only logged when the expression set here evaluates to `true`.
See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more.
|
-| max_req_body_bytes | integer | False | 524288 | >=1
| Maximum request body allowed in bytes. Request bodies falling within
this limit will be pushed to Kafka. If the size exceeds the configured value,
the body will be truncated before being pushed to Kafka.
|
-| include_resp_body | boolean | False | false | [false, true]
| When set to `true` includes the response body in the log.
|
-| include_resp_body_expr | array | False | |
| Filter for when the `include_resp_body` attribute is set to `true`.
Response body is only logged when the expression set here evaluates to `true`.
See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more.
|
-| max_resp_body_bytes | integer | False | 524288 | >=1
| Maximum response body allowed in bytes. Response bodies falling
within this limit will be pushed to Kafka. If the size exceeds the configured
value, the body will be truncated before being pushed to Kafka.
|
-| cluster_name | integer | False | 1 | [0,...]
| Name of the cluster. Used when there are two or more Kafka clusters.
Only works if the `producer_type` attribute is set to `async`.
|
-| producer_batch_num | integer | optional | 200 | [1,...]
| `batch_num` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). The merge
message and batch is send to the server. Unit is message count.
[...]
-| producer_batch_size | integer | optional | 1048576 | [0,...]
| `batch_size` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes.
[...]
-| producer_max_buffering | integer | optional | 50000 | [1,...]
| `max_buffering` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing
maximum buffer size. Unit is message count.
[...]
-| producer_time_linger | integer | optional | 1 | [1,...]
| `flush_time` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds.
[...]
-| meta_refresh_interval | integer | optional | 30 | [1,...]
| `refresh_interval` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the
time to auto refresh the metadata, in seconds.
[...]
+| Name | Type | Required | Default |
Valid values | Description
|
+| -------------------------------- | ------- | -------- | -------------- |
------------------------------------------------- |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
+| broker_list | object | False | |
| Deprecated, use `brokers`
instead. List of Kafka brokers (nodes).
|
+| brokers | array | True | |
| List of Kafka brokers (nodes).
|
+| brokers.host | string | True | |
| The host of Kafka broker, e.g.
`192.168.1.1`.
|
+| brokers.port | integer | True | | [1,
65535] | The port of Kafka broker.
|
+| brokers.sasl_config | object | False | |
| The SASL config of Kafka
broker.
|
+| brokers.sasl_config.mechanism | string | False | "PLAIN" |
["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | The mechanism of SASL
config.
|
+| brokers.sasl_config.user | string | True | |
| The user of `sasl_config`.
Required if `sasl_config` is configured.
|
+| brokers.sasl_config.password | string | True | |
| The password of `sasl_config`.
Required if `sasl_config` is configured.
|
+| kafka_topic | string | True | |
| Target topic to push the logs.
|
+| producer_type | string | False | async |
["async", "sync"] | Message sending mode of the
producer.
|
+| required_acks | integer | False | 1 | [1,
-1] | Number of acknowledgements the
leader needs to receive for the producer to consider the request complete. This
controls the durability of the sent records. The attribute follows the same
configuration as the Kafka `acks` attribute. `required_acks` cannot be 0. See
[Apache Kafka
documentation](https://kafka.apache.org/documentation/#producerconfigs_acks)
for more. |
+| key | string | False | |
| Key used for allocating
partitions for messages.
|
+| timeout | integer | False | 3 |
[1,...] | Timeout in seconds for the
upstream to send data.
|
+| name | string | False | "kafka logger" |
| Unique identifier for the batch
processor. If you use Prometheus to monitor APISIX metrics, the name is
exported in `apisix_batch_process_entries`.
|
+| meta_format | enum | False | "default" |
["default","origin"] | Format to collect the
request information. Setting to `default` collects the information in JSON
format and `origin` collects the information with the original HTTP request.
See [examples](#meta_format-example) below.
|
+| log_format | object | False | |
| Log format declared as
key-value pairs in JSON. Values support strings and nested objects (up to five
levels deep; deeper fields are truncated). Within strings,
[APISIX](../apisix-variable.md) or
[NGINX](http://nginx.org/en/docs/varindex.html) variables can be referenced by
prefixing with `$`. |
+| include_req_body | boolean | False | false |
[false, true] | When set to `true` includes
the request body in the log. If the request body is too big to be kept in the
memory, it can't be logged due to NGINX's limitations.
|
+| include_req_body_expr | array | False | |
| Filter for when the
`include_req_body` attribute is set to `true`. Request body is only logged when
the expression set here evaluates to `true`. See
[lua-resty-expr](https://github.com/api7/lua-resty-expr) for more.
|
+| max_req_body_bytes | integer | False | 524288 | >=1
| Maximum request body size in
bytes to push to Kafka. If the size exceeds the configured value, the body will
be truncated before being pushed.
|
+| include_resp_body | boolean | False | false |
[false, true] | When set to `true` includes
the response body in the log.
|
+| include_resp_body_expr | array | False | |
| Filter for when the
`include_resp_body` attribute is set to `true`. Response body is only logged
when the expression set here evaluates to `true`. See
[lua-resty-expr](https://github.com/api7/lua-resty-expr) for more.
|
+| max_resp_body_bytes | integer | False | 524288 | >=1
| Maximum response body size in
bytes to push to Kafka. If the size exceeds the configured value, the body will
be truncated before being pushed.
|
+| cluster_name | integer | False | 1 |
[1,...] | Name of the cluster. Used
when there are two or more Kafka clusters. Only works if the `producer_type`
attribute is set to `async`.
|
+| producer_batch_num | integer | False | 200 |
[1,...] | `batch_num` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). Merges
messages and sends them in batches. Unit is message count.
|
+| producer_batch_size | integer | False | 1048576 |
[0,...] | `batch_size` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes.
|
+| producer_max_buffering | integer | False | 50000 |
[1,...] | `max_buffering` parameter
in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka)
representing maximum buffer size. Unit is message count.
|
+| producer_time_linger | integer | False | 1 |
[1,...] | `flush_time` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds.
|
+| meta_refresh_interval | integer | False | 30 |
[1,...] | `refresh_interval`
parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka)
that specifies the interval to auto-refresh the metadata, in seconds.
|
This Plugin supports using batch processors to aggregate and process entries
(logs/data) in a batch. This avoids the need for frequently submitting the
data. The batch processor submits data every `5` seconds or when the data in
the queue reaches `1000`. See [Batch
Processor](../batch-processor.md#configuration) for more information or setting
your custom configuration.
@@ -136,10 +140,10 @@ If the process is successful, it will return `true` and
if it fails, returns `ni
You can also set the format of the logs by configuring the Plugin metadata.
The following configurations are available:
-| Name | Type | Required | Default
| Description
|
-| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
-| log_format | object | False | | Log format declared as key-value pairs in
JSON. Values support strings and nested objects (up to five levels deep; deeper
fields are truncated). Within strings, [APISIX](../apisix-variable.md) or
[NGINX](http://nginx.org/en/docs/varindex.html) variables can be referenced by
prefixing with `$`. |
-| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
+| Name | Type | Required | Default | Description
|
+| ------------------- | ------- | -------- | ------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
+| log_format | object | False | | Log format declared as
key-value pairs in JSON. Values support strings and nested objects (up to five
levels deep; deeper fields are truncated). Within strings,
[APISIX](../apisix-variable.md) or
[NGINX](http://nginx.org/en/docs/varindex.html) variables can be referenced by
prefixing with `$`. |
+| max_pending_entries | integer | False | | Maximum number of
pending entries that can be buffered in the batch processor before it starts
dropping them.
|
:::info IMPORTANT
@@ -147,9 +151,45 @@ Configuring the Plugin metadata is global in scope. This
means that it will take
:::
-The example below shows how you can configure through the Admin API:
+## Examples
+
+The examples below demonstrate how to configure the `kafka-logger` Plugin for
different use cases.
+
+To follow along the examples, start a sample Kafka cluster using Docker
Compose:
+
+```yaml title="docker-compose.yml"
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.8.0
+ container_name: zookeeper
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ kafka:
+ image: confluentinc/cp-kafka:7.8.0
+ container_name: kafka
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ ports:
+ - "9092:9092"
+```
+
+Start containers:
+
+```shell
+docker compose up -d
+```
:::note
+
You can fetch the `admin_key` from `config.yaml` and save to an environment
variable with the following command:
```bash
@@ -158,95 +198,289 @@ admin_key=$(yq '.deployment.admin.admin_key[0].key'
conf/config.yaml | sed 's/"/
:::
+### Log in Different Meta Log Formats
+
+The following example demonstrates how to enable the `kafka-logger` Plugin on
a Route, which logs client requests and pushes logs to Kafka. You will also
understand the differences between the `default` and `origin` meta log formats.
+
+In a separate terminal, wait for messages in the configured Kafka topic:
+
```shell
-curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger -H
"X-API-KEY: $admin_key" -X PUT -d '
+docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092
--topic test2 --from-beginning
+```
+
+Open a new terminal session for the following steps.
+
+Create a Route with `kafka-logger`. Set `meta_format` to the `default` log
format, and set `batch_max_size` to `1` to send the log entry immediately:
+
+```shell
+curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "id": "kafka-logger-route",
+ "uri": "/get",
+ "plugins": {
+ "kafka-logger": {
+ "meta_format": "default",
+ "brokers": [
+ {
+ "host": "127.0.0.1",
+ "port": 9092
+ }
+ ],
+ "kafka_topic": "test2",
+ "key": "key1",
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "httpbin.org:80": 1
+ },
+ "type": "roundrobin"
+ }
+ }'
+```
+
+Send a request to the Route to generate a log entry:
+
+```shell
+curl -i "http://127.0.0.1:9080/get"
+```
+
+You should see an `HTTP/1.1 200 OK` response.
+
+You should see a log entry in the Kafka topic similar to the following:
+
+```json
{
- "log_format": {
- "host": "$host",
- "@timestamp": "$time_iso8601",
- "client_ip": "$remote_addr",
- "request": { "method": "$request_method", "uri": "$request_uri" },
- "response": { "status": "$status" }
+ "latency": 411.00001335144,
+ "request": {
+ "querystring": {},
+ "headers": {
+ "host": "127.0.0.1:9080",
+ "user-agent": "curl/8.7.1",
+ "accept": "*/*",
+ "x-forwarded-proto": "http",
+ "x-forwarded-host": "127.0.0.1",
+ "x-forwarded-port": "9080"
+ },
+ "method": "GET",
+ "size": 83,
+ "uri": "/get",
+ "url": "http://127.0.0.1:9080/get"
+ },
+ "response": {
+ "headers": {
+ "content-length": "233",
+ "access-control-allow-credentials": "true",
+ "content-type": "application/json",
+ "connection": "close",
+ "access-control-allow-origin": "*",
+ "date": "Fri, 10 Nov 2023 06:02:44 GMT",
+ "server": "APISIX/3.16.0"
+ },
+ "status": 200,
+ "size": 475
+ },
+ "route_id": "kafka-logger-route",
+ "client_ip": "127.0.0.1",
+ "server": {
+ "hostname": "apisix",
+ "version": "3.16.0"
+ },
+ "apisix_latency": 18.00001335144,
+ "service_id": "",
+ "upstream_latency": 393,
+ "start_time": 1699596164550,
+ "upstream": "54.90.18.68:80"
+}
+```
+
+Update the meta log format to `origin`:
+
+```shell
+curl "http://127.0.0.1:9180/apisix/admin/routes/kafka-logger-route" -X PATCH \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "plugins": {
+ "kafka-logger": {
+ "meta_format": "origin"
+ }
}
-}'
+ }'
```
-With this configuration, your logs would be formatted as shown below:
+Send a request to the Route again to generate a new log entry:
```shell
-{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","request":{"method":"GET","uri":"/hello"},"response":{"status":200},"route_id":"1"}
-{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","request":{"method":"GET","uri":"/hello"},"response":{"status":200},"route_id":"1"}
+curl -i "http://127.0.0.1:9080/get"
```
-## Enable Plugin
+You should see an `HTTP/1.1 200 OK` response.
+
+You should see a log entry in the Kafka topic similar to the following:
+
+```text
+GET /get HTTP/1.1
+x-forwarded-proto: http
+x-forwarded-host: 127.0.0.1
+user-agent: curl/8.7.1
+x-forwarded-port: 9080
+host: 127.0.0.1:9080
+accept: */*
+```
-The example below shows how you can enable the `kafka-logger` Plugin on a
specific Route:
+### Log Request and Response Headers With Plugin Metadata
+
+The following example demonstrates how to customize the log format using
[plugin metadata](../terminology/plugin-metadata.md) and [built-in
variables](../apisix-variable.md) to log specific headers from request and
response.
+
+Plugin metadata is used to configure the common metadata fields of all Plugin
instances of the same Plugin. It is useful when a Plugin is enabled across
multiple resources and requires a universal update to their metadata fields.
+
+First, create a Route with `kafka-logger`. Set `meta_format` to `default`
(required for custom log format via plugin metadata) and `batch_max_size` to
`1` to send log entries immediately:
```shell
-curl http://127.0.0.1:9180/apisix/admin/routes/5 -H "X-API-KEY: $admin_key" -X
PUT -d '
-{
+curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "id": "kafka-logger-route",
+ "uri": "/get",
"plugins": {
- "kafka-logger": {
- "brokers" : [
- {
- "host" :"127.0.0.1",
- "port" : 9092
- }
- ],
- "kafka_topic" : "test2",
- "key" : "key1",
- "batch_max_size": 1,
- "name": "kafka logger"
- }
+ "kafka-logger": {
+ "meta_format": "default",
+ "brokers": [
+ {
+ "host": "127.0.0.1",
+ "port": 9092
+ }
+ ],
+ "kafka_topic": "test2",
+ "key": "key1",
+ "batch_max_size": 1
+ }
},
"upstream": {
- "nodes": {
- "127.0.0.1:1980": 1
- },
- "type": "roundrobin"
- },
- "uri": "/hello"
-}'
+ "nodes": {
+ "httpbin.org:80": 1
+ },
+ "type": "roundrobin"
+ }
+ }'
```
-This Plugin also supports pushing to more than one broker at a time. You can
specify multiple brokers in the Plugin configuration as shown below:
+:::note
+
+If `meta_format` is set to `origin`, log entries will remain in `origin`
format regardless of plugin metadata log format configuration.
+
+:::
+
+Next, configure the Plugin metadata for `kafka-logger` to log the custom
request header `env` and the response header `Content-Type`:
+
+```shell
+curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger" -X PUT \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "log_format": {
+ "host": "$host",
+ "@timestamp": "$time_iso8601",
+ "client_ip": "$remote_addr",
+ "env": "$http_env",
+ "resp_content_type": "$sent_http_Content_Type"
+ }
+ }'
+```
+
+Send a request to the Route with the `env` header:
+
+```shell
+curl -i "http://127.0.0.1:9080/get" -H "env: dev"
+```
+
+You should see a log entry in the Kafka topic similar to the following:
```json
- "brokers" : [
- {
- "host" :"127.0.0.1",
- "port" : 9092
+{
+ "@timestamp": "2023-11-10T23:09:04+00:00",
+ "host": "127.0.0.1",
+ "client_ip": "127.0.0.1",
+ "route_id": "kafka-logger-route",
+ "env": "dev",
+ "resp_content_type": "application/json"
+}
+```
+
+### Log Request Bodies Conditionally
+
+The following example demonstrates how to conditionally log request bodies.
+
+Create a Route with `kafka-logger`. Set `include_req_body` to `true` to
include the request body, and set `include_req_body_expr` to only include the
body when the URL query string `log_body` equals `yes`:
+
+```shell
+curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "id": "kafka-logger-route",
+ "uri": "/post",
+ "plugins": {
+ "kafka-logger": {
+ "brokers": [
+ {
+ "host": "127.0.0.1",
+ "port": 9092
+ }
+ ],
+ "kafka_topic": "test2",
+ "key": "key1",
+ "batch_max_size": 1,
+ "include_req_body": true,
+ "include_req_body_expr": [["arg_log_body", "==", "yes"]]
+ }
},
- {
- "host" :"127.0.0.1",
- "port" : 9093
+ "upstream": {
+ "nodes": {
+ "httpbin.org:80": 1
+ },
+ "type": "roundrobin"
}
-],
+ }'
```
-## Example usage
-
-Now, if you make a request to APISIX, it will be logged in your Kafka server:
+Send a request to the Route with a URL query string satisfying the condition:
```shell
-curl -i http://127.0.0.1:9080/hello
+curl -i "http://127.0.0.1:9080/post?log_body=yes" -X POST -d '{"env": "dev"}'
```
-## Delete Plugin
+You should see the request body logged:
-To remove the `kafka-logger` Plugin, you can delete the corresponding JSON
configuration from the Plugin configuration. APISIX will automatically reload
and you do not have to restart for this to take effect.
+```json
+{
+ "...",
+ "request": {
+ "method": "POST",
+ "body": "{\"env\": \"dev\"}",
+ "size": 179
+ }
+}
+```
+
+Send another request without the URL query string:
```shell
-curl http://127.0.0.1:9180/apisix/admin/routes/1 -H "X-API-KEY: $admin_key"
-X PUT -d '
+curl -i "http://127.0.0.1:9080/post" -X POST -d '{"env": "dev"}'
+```
+
+You should not observe the request body in the log.
+
+:::note
+
+If you have customized the `log_format` in addition to setting
`include_req_body` or `include_resp_body` to `true`, the Plugin will not
include the bodies in the logs. As a workaround, you can use the NGINX variable
`$request_body` in the log format:
+
+```json
{
- "methods": ["GET"],
- "uri": "/hello",
- "plugins": {},
- "upstream": {
- "type": "roundrobin",
- "nodes": {
- "127.0.0.1:1980": 1
- }
- }
-}'
+ "kafka-logger": {
+ "log_format": {"body": "$request_body"}
+ }
+}
```
+
+:::
diff --git a/docs/zh/latest/plugins/kafka-logger.md
b/docs/zh/latest/plugins/kafka-logger.md
index bd606f180..3b3623356 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -5,7 +5,7 @@ keywords:
- API 网关
- Plugin
- Kafka Logger
-description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作为 JSON 对象推送到 Apache
Kafka 集群中。
+description: kafka-logger 插件将请求和响应日志作为 JSON 对象批量推送到 Apache Kafka
集群,并支持自定义日志格式以便更好地管理数据。
---
<!--
@@ -27,48 +27,54 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
#
-->
+<head>
+ <link rel="canonical" href="https://docs.api7.ai/hub/kafka-logger" />
+</head>
+
## 描述
-`kafka-logger` 插件用于将日志作为 JSON 对象推送到 Apache Kafka 集群中。可用作 `ngx_lua` NGINX 模块的
Kafka 客户端驱动程序。
+`kafka-logger` 插件将请求和响应日志作为 JSON 对象批量推送到 Apache Kafka 集群,并支持自定义日志格式。
+
+接收日志数据可能需要一些时间。数据将在 [批处理器](../batch-processor.md) 中的计时器函数到期后自动发送。
## 属性
-| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
-| ---------------------- | ------- | ------ | -------------- |
--------------------- | ------------------------------------------------ |
-| broker_list | object | 是 | |
| 已废弃,现使用 `brokers` 属性代替。原指需要推送的 Kafka 的 broker 列表。 |
-| brokers | array | 是 | |
| 需要推送的 Kafka 的 broker 列表。 |
-| brokers.host | string | 是 | |
| Kafka broker 的节点 host 配置,例如 `192.168.1.1` |
-| brokers.port | string | 是 | |
| Kafka broker 的节点端口配置 |
-| brokers.sasl_config | object | 否 | |
| Kafka broker 中的 sasl_config |
-| brokers.sasl_config.mechanism | string | 否 | "PLAIN" |
["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | Kafka broker 中的 sasl 认证机制
|
-| brokers.sasl_config.user | string | 是 | |
| Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 |
-| brokers.sasl_config.password | string | 是 | |
| Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 |
-| kafka_topic | string | 是 | |
| 需要推送的 topic。 |
-| producer_type | string | 否 | async | ["async",
"sync"] | 生产者发送消息的模式。 |
-| required_acks | integer | 否 | 1 | [1, -1]
| 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks`
属性相同,具体配置请参考 [Apache Kafka
文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。required_acks
还不支持为 0。 |
-| key | string | 否 | |
| 用于消息分区而分配的密钥。 |
-| timeout | integer | 否 | 3 | [1,...]
| 发送数据的超时时间。 |
-| name | string | 否 | "kafka logger" |
| 标识 logger 的唯一标识符。如果您使用 Prometheus 监视 APISIX 指标,名称将以
`apisix_batch_process_entries` 导出。 |
-| meta_format | enum | 否 | "default" |
["default","origin"] | `default`:获取请求信息以默认的 JSON 编码方式。`origin`:获取请求信息以 HTTP
原始请求方式。更多信息,请参考 [meta_format](#meta_format-示例)。|
-| log_format | object | 否 | | | 日志格式以 JSON
的键值对声明。值支持字符串和嵌套对象(最多五层,超出部分将被截断)。字符串中可通过在前面加上 `$` 来引用 [APISIX
变量](../apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。 |
-| include_req_body | boolean | 否 | false | [false, true]
| 当设置为 `true` 时,包含请求体。**注意**:如果请求体无法完全存放在内存中,由于 NGINX 的限制,APISIX 无法将它记录下来。|
-| include_req_body_expr | array | 否 | |
| 当 `include_req_body` 属性设置为 `true` 时进行过滤。只有当此处设置的表达式计算结果为 `true`
时,才会记录请求体。更多信息,请参考 [lua-resty-expr](https://github.com/api7/lua-resty-expr)。 |
-| max_req_body_bytes | integer | 否 | 524288 | >=1
| 允许的最大请求正文(以字节为单位)。在此限制内的请求体将被推送到 Kafka。如果大小超过配置值,则正文在推送到 Kafka 之前将被截断。
|
-| include_resp_body | boolean | 否 | false | [false, true]
| 当设置为 `true` 时,包含响应体。 |
-| include_resp_body_expr | array | 否 | |
| 当 `include_resp_body` 属性设置为 `true` 时进行过滤。只有当此处设置的表达式计算结果为 `true`
时才会记录响应体。更多信息,请参考 [lua-resty-expr](https://github.com/api7/lua-resty-expr)。|
-| max_resp_body_bytes | integer | 否 | 524288 | >=1
| 允许的最大响应正文(以字节为单位)。低于此限制的响应主体将被推送到 Kafka。如果大小超过配置值,则正文在推送到 Kafka 之前将被截断。
|
-| cluster_name | integer | 否 | 1 | [0,...]
| Kafka 集群的名称,当有两个及以上 Kafka 集群时使用。只有当 `producer_type` 设为 `async`
模式时才可以使用该属性。|
-| producer_batch_num | integer | 否 | 200 | [1,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`batch_num` 参数,聚合消息批量提交,单位为消息条数。 |
-| producer_batch_size | integer | 否 | 1048576 | [0,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`batch_size` 参数,单位为字节。 |
-| producer_max_buffering | integer | 否 | 50000 | [1,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`max_buffering` 参数,表示最大缓冲区,单位为条。 |
-| producer_time_linger | integer | 否 | 1 | [1,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`flush_time` 参数,单位为秒。|
-| meta_refresh_interval | integer | 否 | 30 | [1,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 |
-
-该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到
`1000` 条时提交数据,如需了解批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置)
配置部分。
-
-:::tip 提示
-
-数据首先写入缓冲区。当缓冲区超过 `batch_max_size` 或 `buffer_duration` 设置的值时,则会将数据发送到 Kafka
服务器并刷新缓冲区。
+| 名称 | 类型 | 是否必需 | 默认值 | 有效值
| 描述
|
+| -------------------------------- | ------- | -------- | --------------- |
------------------------------------------------- |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
+| broker_list | object | 否 | |
| 已废弃,请改用 `brokers` 属性。原指需要推送的
Kafka 的 broker 列表。
|
+| brokers | array | 是 | |
| 需要推送的 Kafka 的 broker 列表。
|
+| brokers.host | string | 是 | |
| Kafka broker 的节点地址,例如
`192.168.1.1`。
|
+| brokers.port | integer | 是 | | [1,
65535] | Kafka broker 的节点端口。
|
+| brokers.sasl_config | object | 否 | |
| Kafka broker 的 SASL 配置。
|
+| brokers.sasl_config.mechanism | string | 否 | "PLAIN" |
["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | SASL 认证机制。
|
+| brokers.sasl_config.user | string | 是 | |
| SASL 配置中的用户名。如果配置了
`sasl_config`,则必须填写。
|
+| brokers.sasl_config.password | string | 是 | |
| SASL 配置中的密码。如果配置了
`sasl_config`,则必须填写。
|
+| kafka_topic | string | 是 | |
| 用于推送日志的目标 topic。
|
+| producer_type | string | 否 | async |
["async", "sync"] | 生产者发送消息的模式。
|
+| required_acks | integer | 否 | 1 | [1,
-1] |
生产者在确认一个请求发送完成之前需要收到的确认信息数量,用于保证发送请求的可靠性。该属性与 Kafka 的 `acks`
属性配置相同,`required_acks` 不能为 0。详情请参考 [Apache Kafka
文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。
|
+| key | string | 否 | |
| 用于消息分区的键。
|
+| timeout | integer | 否 | 3 |
[1,...] | 发送数据的超时时间(秒)。
|
+| name | string | 否 | "kafka logger" |
| 批处理器的唯一标识符。如果使用 Prometheus 监控
APISIX 指标,该名称将以 `apisix_batch_process_entries` 导出。
|
+| meta_format | enum | 否 | "default" |
["default","origin"] | 收集请求信息的格式。设置为 `default` 时以
JSON 格式收集信息,设置为 `origin` 时以 HTTP 原始请求格式收集信息。详情请参考下方 [示例](#meta_format-示例)。
|
+| log_format | object | 否 | |
| 以 JSON
键值对声明的日志格式。值支持字符串和嵌套对象(最多五层,超出部分将被截断)。字符串中可通过在前面加上 `$` 来引用 [APISIX
变量](../apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。
|
+| include_req_body | boolean | 否 | false |
[false, true] | 设置为 `true`
时,在日志中包含请求体。**注意**:如果请求体过大无法完全存放在内存中,由于 NGINX 的限制,将无法记录。
|
+| include_req_body_expr | array | 否 | |
| 当 `include_req_body` 设置为 `true`
时的过滤条件。只有当此处设置的表达式计算结果为 `true` 时,才会记录请求体。详情请参考
[lua-resty-expr](https://github.com/api7/lua-resty-expr)。
|
+| max_req_body_bytes | integer | 否 | 524288 | >=1
| 允许推送到 Kafka
的最大请求体大小(字节)。如果超过该值,请求体在推送前会被截断。
|
+| include_resp_body | boolean | 否 | false |
[false, true] | 设置为 `true` 时,在日志中包含响应体。
|
+| include_resp_body_expr | array | 否 | |
| 当 `include_resp_body` 设置为
`true` 时的过滤条件。只有当此处设置的表达式计算结果为 `true` 时,才会记录响应体。详情请参考
[lua-resty-expr](https://github.com/api7/lua-resty-expr)。
|
+| max_resp_body_bytes | integer | 否 | 524288 | >=1
| 允许推送到 Kafka
的最大响应体大小(字节)。如果超过该值,响应体在推送前会被截断。
|
+| cluster_name | integer | 否 | 1 |
[1,...] | Kafka 集群的名称,在有两个或多个 Kafka
集群时使用。仅当 `producer_type` 设置为 `async` 时有效。
|
+| producer_batch_num | integer | 否 | 200 |
[1,...] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `batch_num`
参数,聚合消息批量提交,单位为消息条数。
|
+| producer_batch_size | integer | 否 | 1048576 |
[0,...] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`batch_size` 参数,单位为字节。
|
+| producer_max_buffering | integer | 否 | 50000 |
[1,...] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`max_buffering` 参数,表示最大缓冲区大小,单位为条。
|
+| producer_time_linger | integer | 否 | 1 |
[1,...] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`flush_time` 参数,单位为秒。
|
+| meta_refresh_interval | integer | 否 | 30 |
[1,...] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。
|
+
+该插件支持使用批处理器来聚合并批量处理条目(日志/数据),避免频繁提交数据。默认情况下,批处理器每 `5` 秒或队列中的数据达到 `1000`
条时提交数据。如需了解批处理器相关参数设置,请参考[批处理器](../batch-processor.md#配置)配置部分。
+
+:::info 重要
+
+数据首先写入缓冲区。当缓冲区超过 `batch_max_size` 或 `buffer_duration` 设置的值时,数据将发送到 Kafka
服务器并刷新缓冲区。
如果发送成功,则返回 `true`。如果出现错误,则返回 `nil`,并带有描述错误的字符串 `buffer overflow`。
@@ -78,76 +84,113 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
- `default`:
- ```json
- {
- "upstream": "127.0.0.1:1980",
- "start_time": 1619414294760,
- "client_ip": "127.0.0.1",
- "service_id": "",
- "route_id": "1",
- "request": {
- "querystring": {
- "ab": "cd"
- },
- "size": 90,
- "uri": "/hello?ab=cd",
- "url": "http://localhost:1984/hello?ab=cd",
- "headers": {
- "host": "localhost",
- "content-length": "6",
- "connection": "close"
- },
- "body": "abcdef",
- "method": "GET"
- },
- "response": {
- "headers": {
- "connection": "close",
- "content-type": "text/plain; charset=utf-8",
- "date": "Mon, 26 Apr 2021 05:18:14 GMT",
- "server": "APISIX/2.5",
- "transfer-encoding": "chunked"
- },
- "size": 190,
- "status": 200
- },
- "server": {
- "hostname": "localhost",
- "version": "2.5"
- },
- "latency": 0
- }
- ```
+ ```json
+ {
+ "upstream": "127.0.0.1:1980",
+ "start_time": 1619414294760,
+ "client_ip": "127.0.0.1",
+ "service_id": "",
+ "route_id": "1",
+ "request": {
+ "querystring": {
+ "ab": "cd"
+ },
+ "size": 90,
+ "uri": "/hello?ab=cd",
+ "url": "http://localhost:1984/hello?ab=cd",
+ "headers": {
+ "host": "localhost",
+ "content-length": "6",
+ "connection": "close"
+ },
+ "body": "abcdef",
+ "method": "GET"
+ },
+ "response": {
+ "headers": {
+ "connection": "close",
+ "content-type": "text/plain; charset=utf-8",
+ "date": "Mon, 26 Apr 2021 05:18:14 GMT",
+ "server": "APISIX/2.5",
+ "transfer-encoding": "chunked"
+ },
+ "size": 190,
+ "status": 200
+ },
+ "server": {
+ "hostname": "localhost",
+ "version": "2.5"
+ },
+ "latency": 0
+ }
+ ```
- `origin`:
- ```http
- GET /hello?ab=cd HTTP/1.1
- host: localhost
- content-length: 6
- connection: close
+ ```http
+ GET /hello?ab=cd HTTP/1.1
+ host: localhost
+ content-length: 6
+ connection: close
- abcdef
- ```
+ abcdef
+ ```
## 插件元数据
-| 名称 | 类型 | 必选项 | 默认值 | 描述
|
-| ---------------- | ------- | ------ | -------------
|------------------------------------------------ |
-| log_format | object | 否 | | 日志格式以 JSON
的键值对声明。值支持字符串和嵌套对象(最多五层,超出部分将被截断)。字符串中可通过在前面加上 `$` 来引用 [APISIX
变量](../../../en/latest/apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。 |
-| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
+你也可以通过配置插件元数据来设置日志格式。可用配置如下:
-:::note 注意
+| 名称 | 类型 | 是否必需 | 默认值 | 描述
|
+| ------------------- | ------- | -------- | ------ |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
+| log_format | object | 否 | | 以 JSON
键值对声明的日志格式。值支持字符串和嵌套对象(最多五层,超出部分将被截断)。字符串中可通过在前面加上 `$` 来引用 [APISIX
变量](../apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | |
批处理器开始丢弃待处理条目之前可缓冲的最大待处理条目数。
|
-该设置全局生效。如果指定了 `log_format`,则所有绑定 `kafka-logger` 的路由或服务都将使用该日志格式。
+:::info 重要
+
+插件元数据配置为全局生效。这意味着它将对所有使用 `kafka-logger` 插件的路由和服务生效。
:::
-以下示例展示了如何通过 Admin API 配置插件元数据:
+## 示例
+
+以下示例展示了 `kafka-logger` 插件在不同使用场景下的配置方式。
+
+按照示例操作前,请先使用 Docker Compose 启动一个 Kafka 集群:
+
+```yaml title="docker-compose.yml"
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.8.0
+ container_name: zookeeper
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ kafka:
+ image: confluentinc/cp-kafka:7.8.0
+ container_name: kafka
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS:
PLAINTEXT://kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ ports:
+ - "9092:9092"
+```
+
+启动容器:
+
+```shell
+docker compose up -d
+```
:::note
-您可以这样从 `config.yaml` 中获取 `admin_key` 并存入环境变量:
+你可以这样从 `config.yaml` 中获取 `admin_key` 并存入环境变量:
```bash
admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed
's/"//g')
@@ -155,96 +198,289 @@ admin_key=$(yq '.deployment.admin.admin_key[0].key'
conf/config.yaml | sed 's/"/
:::
+### 使用不同的元日志格式记录日志
+
+以下示例演示了如何在路由上启用 `kafka-logger` 插件记录客户端请求并将日志推送到 Kafka,同时介绍 `default` 和
`origin` 元日志格式的区别。
+
+在另一个终端中,等待配置的 Kafka topic 中的消息:
+
```shell
-curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger \
--H "X-API-KEY: $admin_key" -X PUT -d '
+docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092
--topic test2 --from-beginning
+```
+
+打开一个新终端,执行以下操作。
+
+创建一条启用 `kafka-logger` 插件的路由。将 `meta_format` 设置为 `default` 日志格式,并将
`batch_max_size` 设置为 `1` 以立即发送日志条目:
+
+```shell
+curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "id": "kafka-logger-route",
+ "uri": "/get",
+ "plugins": {
+ "kafka-logger": {
+ "meta_format": "default",
+ "brokers": [
+ {
+ "host": "127.0.0.1",
+ "port": 9092
+ }
+ ],
+ "kafka_topic": "test2",
+ "key": "key1",
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "httpbin.org:80": 1
+ },
+ "type": "roundrobin"
+ }
+ }'
+```
+
+向路由发送请求以生成日志条目:
+
+```shell
+curl -i "http://127.0.0.1:9080/get"
+```
+
+你应该收到一个 `HTTP/1.1 200 OK` 响应。
+
+你应该在 Kafka topic 中看到类似如下的日志条目:
+
+```json
{
- "log_format": {
- "host": "$host",
- "@timestamp": "$time_iso8601",
- "client_ip": "$remote_addr",
- "request": { "method": "$request_method", "uri": "$request_uri" },
- "response": { "status": "$status" }
+ "latency": 411.00001335144,
+ "request": {
+ "querystring": {},
+ "headers": {
+ "host": "127.0.0.1:9080",
+ "user-agent": "curl/8.7.1",
+ "accept": "*/*",
+ "x-forwarded-proto": "http",
+ "x-forwarded-host": "127.0.0.1",
+ "x-forwarded-port": "9080"
+ },
+ "method": "GET",
+ "size": 83,
+ "uri": "/get",
+ "url": "http://127.0.0.1:9080/get"
+ },
+ "response": {
+ "headers": {
+ "content-length": "233",
+ "access-control-allow-credentials": "true",
+ "content-type": "application/json",
+ "connection": "close",
+ "access-control-allow-origin": "*",
+ "date": "Fri, 10 Nov 2023 06:02:44 GMT",
+ "server": "APISIX/3.16.0"
+ },
+ "status": 200,
+ "size": 475
+ },
+ "route_id": "kafka-logger-route",
+ "client_ip": "127.0.0.1",
+ "server": {
+ "hostname": "apisix",
+ "version": "3.16.0"
+ },
+ "apisix_latency": 18.00001335144,
+ "service_id": "",
+ "upstream_latency": 393,
+ "start_time": 1699596164550,
+ "upstream": "54.90.18.68:80"
+}
+```
+
+将元日志格式更新为 `origin`:
+
+```shell
+curl "http://127.0.0.1:9180/apisix/admin/routes/kafka-logger-route" -X PATCH \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "plugins": {
+ "kafka-logger": {
+ "meta_format": "origin"
+ }
}
-}'
+ }'
```
-配置完成后,你将在日志系统中看到如下类似日志:
+再次向路由发送请求以生成新的日志条目:
```shell
-{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","request":{"method":"GET","uri":"/hello"},"response":{"status":200},"route_id":"1"}
-{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","request":{"method":"GET","uri":"/hello"},"response":{"status":200},"route_id":"1"}
+curl -i "http://127.0.0.1:9080/get"
+```
+
+你应该收到一个 `HTTP/1.1 200 OK` 响应。
+
+你应该在 Kafka topic 中看到类似如下的日志条目:
+
+```text
+GET /get HTTP/1.1
+x-forwarded-proto: http
+x-forwarded-host: 127.0.0.1
+user-agent: curl/8.7.1
+x-forwarded-port: 9080
+host: 127.0.0.1:9080
+accept: */*
```
-## 如何启用
+### 通过插件元数据记录请求和响应头
-你可以通过如下命令在指定路由上启用 `kafka-logger` 插件:
+以下示例演示了如何使用[插件元数据](../terminology/plugin-metadata.md)和[内置变量](../apisix-variable.md)自定义日志格式,以记录请求和响应中的特定头字段。
+
+插件元数据用于配置同一插件的所有插件实例的公共元数据字段,当一个插件在多个资源上启用且需要统一更新其元数据字段时非常有用。
+
+首先,创建一条启用 `kafka-logger` 插件的路由。将 `meta_format` 设置为
`default`(使用插件元数据自定义日志格式时必须设置),并将 `batch_max_size` 设置为 `1` 以立即发送日志条目:
```shell
-curl http://127.0.0.1:9180/apisix/admin/routes/1 \
--H "X-API-KEY: $admin_key" -X PUT -d '
-{
+curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "id": "kafka-logger-route",
+ "uri": "/get",
"plugins": {
- "kafka-logger": {
- "brokers" : [
- {
- "host": "127.0.0.1",
- "port": 9092
- }
- ],
- "kafka_topic" : "test2",
- "key" : "key1"
- }
+ "kafka-logger": {
+ "meta_format": "default",
+ "brokers": [
+ {
+ "host": "127.0.0.1",
+ "port": 9092
+ }
+ ],
+ "kafka_topic": "test2",
+ "key": "key1",
+ "batch_max_size": 1
+ }
},
"upstream": {
- "nodes": {
- "127.0.0.1:1980": 1
- },
- "type": "roundrobin"
- },
- "uri": "/hello"
-}'
+ "nodes": {
+ "httpbin.org:80": 1
+ },
+ "type": "roundrobin"
+ }
+ }'
```
-该插件还支持一次推送到多个 Broker,示例如下:
+:::note
+
+如果 `meta_format` 设置为 `origin`,无论插件元数据中的日志格式配置如何,日志条目都将保持 `origin` 格式。
+
+:::
+
+接下来,为 `kafka-logger` 配置插件元数据,以记录自定义请求头 `env` 和响应头 `Content-Type`:
+
+```shell
+curl "http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger" -X PUT \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "log_format": {
+ "host": "$host",
+ "@timestamp": "$time_iso8601",
+ "client_ip": "$remote_addr",
+ "env": "$http_env",
+ "resp_content_type": "$sent_http_Content_Type"
+ }
+ }'
+```
+
+向路由发送带有 `env` 头的请求:
+
+```shell
+curl -i "http://127.0.0.1:9080/get" -H "env: dev"
+```
+
+你应该在 Kafka topic 中看到类似如下的日志条目:
```json
-"brokers" : [
- {
- "host" :"127.0.0.1",
- "port" : 9092
+{
+ "@timestamp": "2023-11-10T23:09:04+00:00",
+ "host": "127.0.0.1",
+ "client_ip": "127.0.0.1",
+ "route_id": "kafka-logger-route",
+ "env": "dev",
+ "resp_content_type": "application/json"
+}
+```
+
+### 按条件记录请求体
+
+以下示例演示了如何有条件地记录请求体。
+
+创建一条启用 `kafka-logger` 插件的路由。将 `include_req_body` 设置为 `true` 以包含请求体,并设置
`include_req_body_expr`,使其仅在 URL 查询字符串 `log_body` 等于 `yes` 时才包含请求体:
+
+```shell
+curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \
+ -H "X-API-KEY: ${admin_key}" \
+ -d '{
+ "id": "kafka-logger-route",
+ "uri": "/post",
+ "plugins": {
+ "kafka-logger": {
+ "brokers": [
+ {
+ "host": "127.0.0.1",
+ "port": 9092
+ }
+ ],
+ "kafka_topic": "test2",
+ "key": "key1",
+ "batch_max_size": 1,
+ "include_req_body": true,
+ "include_req_body_expr": [["arg_log_body", "==", "yes"]]
+ }
},
- {
- "host" :"127.0.0.1",
- "port" : 9093
+ "upstream": {
+ "nodes": {
+ "httpbin.org:80": 1
+ },
+ "type": "roundrobin"
}
-],
+ }'
```
-## 测试插件
-
-你可以通过以下命令向 APISIX 发出请求:
+发送满足条件的请求(包含查询字符串):
```shell
-curl -i http://127.0.0.1:9080/hello
+curl -i "http://127.0.0.1:9080/post?log_body=yes" -X POST -d '{"env": "dev"}'
```
-## 删除插件
+你应该看到请求体被记录到日志中:
+
+```json
+{
+ "...",
+ "request": {
+ "method": "POST",
+ "body": "{\"env\": \"dev\"}",
+ "size": 179
+ }
+}
+```
-当你需要删除该插件时,可以通过如下命令删除相应的 JSON 配置,APISIX 将会自动重新加载相关配置,无需重启服务:
+发送不包含查询字符串的请求:
```shell
-curl http://127.0.0.1:9180/apisix/admin/routes/1 \
--H "X-API-KEY: $admin_key" -X PUT -d '
+curl -i "http://127.0.0.1:9080/post" -X POST -d '{"env": "dev"}'
+```
+
+此时日志中不会包含请求体。
+
+:::note
+
+如果在将 `include_req_body` 或 `include_resp_body` 设置为 `true` 的同时自定义了
`log_format`,插件将不会在日志中包含请求体或响应体。作为变通方案,可以在日志格式中使用 NGINX 变量 `$request_body`:
+
+```json
{
- "methods": ["GET"],
- "uri": "/hello",
- "plugins": {},
- "upstream": {
- "type": "roundrobin",
- "nodes": {
- "127.0.0.1:1980": 1
- }
- }
-}'
+ "kafka-logger": {
+ "log_format": {"body": "$request_body"}
+ }
+}
```
+
+:::