This is an automated email from the ASF dual-hosted git repository. ashishtiwari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push: new 78e8fc0de fix: support multiple versions of Elasticsearch logger (#12364) 78e8fc0de is described below commit 78e8fc0debfe6f1efff9de0ae4f98b3d86af1de2 Author: Ashish Tiwari <ashishjaitiwari15112...@gmail.com> AuthorDate: Thu Jun 26 17:35:25 2025 +0530 fix: support multiple versions of Elasticsearch logger (#12364) --- apisix/plugins/elasticsearch-logger.lua | 91 +++++++++-- ci/pod/docker-compose.plugin.yml | 38 +++++ docs/en/latest/plugins/elasticsearch-logger.md | 16 +- docs/zh/latest/plugins/elasticsearch-logger.md | 16 +- t/plugin/elasticsearch-logger.t | 201 ++++++++++++++++++++++++- 5 files changed, 321 insertions(+), 41 deletions(-) diff --git a/apisix/plugins/elasticsearch-logger.lua b/apisix/plugins/elasticsearch-logger.lua index 2b126c434..09dcbd795 100644 --- a/apisix/plugins/elasticsearch-logger.lua +++ b/apisix/plugins/elasticsearch-logger.lua @@ -27,7 +27,6 @@ local math_random = math.random local plugin_name = "elasticsearch-logger" local batch_processor_manager = bp_manager_mod.new(plugin_name) - local schema = { type = "object", properties = { @@ -48,7 +47,6 @@ local schema = { type = "object", properties = { index = { type = "string"}, - type = { type = "string"} }, required = {"index"} }, @@ -124,32 +122,96 @@ function _M.check_schema(conf, schema_type) if schema_type == core.schema.TYPE_METADATA then return core.schema.check(metadata_schema, conf) end + local check = {"endpoint_addrs"} core.utils.check_https(check, conf, plugin_name) core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name) - return core.schema.check(schema, conf) end +local function get_es_major_version(uri, conf) + local httpc = http.new() + if not httpc then + return nil, "failed to create http client" + end + local headers = {} + if conf.auth then + local authorization = "Basic " .. ngx.encode_base64( + conf.auth.username .. ":" .. conf.auth.password + ) + headers["Authorization"] = authorization + end + httpc:set_timeout(conf.timeout * 1000) + local res, err = httpc:request_uri(uri, { + ssl_verify = conf.ssl_verify, + method = "GET", + headers = headers, + }) + if not res then + return false, err + end + if res.status ~= 200 then + return nil, str_format("server returned status: %d, body: %s", + res.status, res.body or "") + end + local json_body, err = core.json.decode(res.body) + if not json_body then + return nil, "failed to decode response body: " .. err + end + if not json_body.version or not json_body.version.number then + return nil, "failed to get version from response body" + end + + local major_version = json_body.version.number:match("^(%d+)%.") + if not major_version then + return nil, "invalid version format: " .. json_body.version.number + end + + return major_version +end + + local function get_logger_entry(conf, ctx) local entry = log_util.get_log_entry(plugin_name, conf, ctx) - return core.json.encode({ - create = { - _index = conf.field.index, - _type = conf.field.type - } - }) .. "\n" .. + local body = { + index = { + _index = conf.field.index + } + } + -- for older version type is required + if conf._version == "6" or conf._version == "5" then + body.index._type = "_doc" + end + return core.json.encode(body) .. "\n" .. core.json.encode(entry) .. "\n" end +local function fetch_and_update_es_version(conf) + if conf._version then + return + end + local selected_endpoint_addr + if conf.endpoint_addr then + selected_endpoint_addr = conf.endpoint_addr + else + selected_endpoint_addr = conf.endpoint_addrs[math_random(#conf.endpoint_addrs)] + end + local major_version, err = get_es_major_version(selected_endpoint_addr, conf) + if err then + core.log.error("failed to get Elasticsearch version: ", err) + return + end + conf._version = major_version +end + local function send_to_elasticsearch(conf, entries) local httpc, err = http.new() if not httpc then return false, str_format("create http error: %s", err) end - + fetch_and_update_es_version(conf) local selected_endpoint_addr if conf.endpoint_addr then selected_endpoint_addr = conf.endpoint_addr @@ -159,8 +221,8 @@ local function send_to_elasticsearch(conf, entries) local uri = selected_endpoint_addr .. "/_bulk" local body = core.table.concat(entries, "") local headers = { - ["Content-Type"] = "application/x-ndjson;compatible-with=7", - ["Accept"] = "application/vnd.elasticsearch+json;compatible-with=7" + ["Content-Type"] = "application/x-ndjson", + ["Accept"] = "application/vnd.elasticsearch+json" } if conf.auth then local authorization = "Basic " .. ngx.encode_base64( @@ -195,6 +257,11 @@ function _M.body_filter(conf, ctx) log_util.collect_body(conf, ctx) end +function _M.access(conf) + -- fetch_and_update_es_version will call ES server only the first time + -- so this should not amount to considerable overhead + fetch_and_update_es_version(conf) +end function _M.log(conf, ctx) local entry = get_logger_entry(conf, ctx) diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index 55f2b443c..c0a598c80 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -225,6 +225,44 @@ services: http.port: 9201 xpack.security.enabled: 'true' + elasticsearch-auth-2: + image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2 + restart: unless-stopped + ports: + - "9301:9201" + environment: + ES_JAVA_OPTS: -Xms512m -Xmx512m + discovery.type: single-node + ELASTIC_USERNAME: elastic + ELASTIC_PASSWORD: 123456 + http.port: 9201 + xpack.security.enabled: 'true' + + elasticsearch-auth-3: + image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0 + restart: unless-stopped + ports: + - "9401:9201" + environment: + ES_JAVA_OPTS: -Xms512m -Xmx512m + discovery.type: single-node + ELASTIC_USERNAME: elastic + ELASTIC_PASSWORD: 123456 + http.port: 9201 + xpack.security.enabled: 'true' + + elasticsearch-auth-4: + image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0 + restart: unless-stopped + ports: + - "9501:9201" + environment: + ES_JAVA_OPTS: -Xms512m -Xmx512m + discovery.type: single-node + ELASTIC_USERNAME: elastic + ELASTIC_PASSWORD: 123456 + http.port: 9201 + xpack.security.enabled: 'true' # The function services of OpenFunction test-header: diff --git a/docs/en/latest/plugins/elasticsearch-logger.md b/docs/en/latest/plugins/elasticsearch-logger.md index c0af4b159..80ea07601 100644 --- a/docs/en/latest/plugins/elasticsearch-logger.md +++ b/docs/en/latest/plugins/elasticsearch-logger.md @@ -42,7 +42,6 @@ The `elasticsearch-logger` Plugin pushes request and response logs in batches to | endpoint_addrs | array[string] | True | | Elasticsearch API endpoint addresses. If multiple endpoints are configured, they will be written randomly. | | field | object | True | | Elasticsearch `field` configuration. | | field.index | string | True | | Elasticsearch [_index field](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field). | -| field.type | string | False | Elasticsearch default value | Elasticsearch [_type field](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/mapping-type-field.html#mapping-type-field). | | log_format | object | False | | Custom log format in key-value pairs in JSON format. Support [APISIX](../apisix-variable.md) or [NGINX variables](http://nginx.org/en/docs/varindex.html) in values. | | auth | array | False | | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) configuration. | | auth.username | string | True | | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) username. | @@ -110,7 +109,7 @@ admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"/ The following example demonstrates how you can enable the `elasticsearch-logger` Plugin on a route, which logs client requests and responses to the Route and pushes logs to Elasticsearch. -Create a Route with `elasticsearch-logger` to configure the `index` field as `gateway` and the `type` field as `logs`: +Create a Route with `elasticsearch-logger` to configure the `index` field as `gateway`: ```shell curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ @@ -122,8 +121,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ "elasticsearch-logger": { "endpoint_addrs": ["http://elasticsearch:9200"], "field": { - "index": "gateway", - "type": "logs" + "index": "gateway" } } }, @@ -149,7 +147,6 @@ Navigate to the Kibana dashboard on [localhost:5601](http://localhost:5601) and ```json { "_index": "gateway", - "_type": "logs", "_id": "CE-JL5QBOkdYRG7kEjTJ", "_version": 1, "_score": 1, @@ -215,8 +212,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ "elasticsearch-logger": { "endpoint_addrs": ["http://elasticsearch:9200"], "field": { - "index": "gateway", - "type": "logs" + "index": "gateway" } }, "upstream": { @@ -257,7 +253,6 @@ Navigate to the Kibana dashboard on [localhost:5601](http://localhost:5601) and ```json { "_index": "gateway", - "_type": "logs", "_id": "Ck-WL5QBOkdYRG7kODS0", "_version": 1, "_score": 1, @@ -288,8 +283,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ "elasticsearch-logger": { "endpoint_addrs": ["http://elasticsearch:9200"], "field": { - "index": "gateway", - "type": "logs" + "index": "gateway" }, "include_req_body": true, "include_req_body_expr": [["arg_log_body", "==", "yes"]] @@ -319,7 +313,6 @@ Navigate to the Kibana dashboard on [localhost:5601](http://localhost:5601) and ```json { "_index": "gateway", - "_type": "logs", "_id": "Dk-cL5QBOkdYRG7k7DSW", "_version": 1, "_score": 1, @@ -384,7 +377,6 @@ Navigate to the Kibana dashboard __Discover__ tab and you should see a log gener ```json { "_index": "gateway", - "_type": "logs", "_id": "EU-eL5QBOkdYRG7kUDST", "_version": 1, "_score": 1, diff --git a/docs/zh/latest/plugins/elasticsearch-logger.md b/docs/zh/latest/plugins/elasticsearch-logger.md index b4332fe59..362b57959 100644 --- a/docs/zh/latest/plugins/elasticsearch-logger.md +++ b/docs/zh/latest/plugins/elasticsearch-logger.md @@ -43,7 +43,6 @@ description: elasticsearch-logger Plugin 将请求和响应日志批量推送到 | endup_addrs | array[string] | 是 | | Elasticsearch API 端点地址。如果配置了多个端点,则会随机写入。 | | field | object | 是 | | Elasticsearch `field` 配置。 | | field.index | string | 是 | | Elasticsearch [_index 字段](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field)。 | -| field.type | string | 否 | Elasticsearch 默认值 | Elasticsearch [_type 字段](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/mapping-type-field.html#mapping-type-field)。 | | log_format | object | 否 | | JSON 格式的键值对中的自定义日志格式。值中支持 [APISIX](../apisix-variable.md) 或 [NGINX 变量](http://nginx.org/en/docs/varindex.html)。 | | auth | array | 否 | | Elasticsearch [身份验证](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) 配置。 | | auth.username | string | 是 | | Elasticsearch [身份验证](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) 用户名。 | @@ -111,7 +110,7 @@ admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"/ 以下示例演示如何在路由上启用 `elasticsearch-logger` 插件,该插件记录客户端对路由的请求和响应,并将日志推送到 Elasticsearch。 -使用 `elasticsearch-logger` 创建路由,将 `index` 字段配置为 `gateway`,将 `type` 字段配置为 `logs`: +使用 `elasticsearch-logger` 创建路由,将 `index` 字段配置为 `gateway`: ```shell curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ @@ -123,8 +122,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ "elasticsearch-logger": { "endpoint_addrs": ["http://elasticsearch:9200"], "field": { - "index": "gateway", - "type": "logs" + "index": "gateway" } } }, @@ -150,7 +148,6 @@ curl -i "http://127.0.0.1:9080/anything" ```json { "_index": "gateway", - "_type": "logs", "_id": "CE-JL5QBOkdYRG7kEjTJ", "_version": 1, "_score": 1, @@ -216,8 +213,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ "elasticsearch-logger": { "endpoint_addrs": ["http://elasticsearch:9200"], "field": { - "index": "gateway", - "type": "logs" + "index": "gateway" } }, "upstream": { @@ -258,7 +254,6 @@ curl -i "http://127.0.0.1:9080/anything" -H "env: dev" ```json { "_index": "gateway", - "_type": "logs", "_id": "Ck-WL5QBOkdYRG7kODS0", "_version": 1, "_score": 1, @@ -289,8 +284,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes" -X PUT \ "elasticsearch-logger": { "endpoint_addrs": ["http://elasticsearch:9200"], "field": { - "index": "gateway", - "type": "logs" + "index": "gateway" }, "include_req_body": true, "include_req_body_expr": [["arg_log_body", "==", "yes"]] @@ -320,7 +314,6 @@ curl -i "http://127.0.0.1:9080/anything?log_body=yes" -X POST -d '{"env": "dev"} ```json { "_index": "gateway", - "_type": "logs", "_id": "Dk-cL5QBOkdYRG7k7DSW", "_version": 1, "_score": 1, @@ -385,7 +378,6 @@ curl -i "http://127.0.0.1:9080/anything" -X POST -d '{"env": "dev"}' ```json { "_index": "gateway", - "_type": "logs", "_id": "EU-eL5QBOkdYRG7kUDST", "_version": 1, "_score": 1, diff --git a/t/plugin/elasticsearch-logger.t b/t/plugin/elasticsearch-logger.t index 83960fbac..7b081032a 100644 --- a/t/plugin/elasticsearch-logger.t +++ b/t/plugin/elasticsearch-logger.t @@ -46,8 +46,7 @@ __DATA__ { endpoint_addr = "http://127.0.0.1:9200", field = { - index = "services", - type = "collector" + index = "services" }, auth = { username = "elastic", @@ -165,6 +164,18 @@ passed end) http.request_uri = function(self, uri, params) + if params.method == "GET" then + return { + status = 200, + body = [[ + { + "version": { + "number": "8.10.2" + } + } + ]] + } + end if not params.body or type(params.body) ~= "string" then return nil, "invalid params body" end @@ -293,9 +304,7 @@ GET /hello --- response_body hello world --- error_log -Batch Processor[elasticsearch-logger] failed to process entries: elasticsearch server returned status: 401 -"reason":"missing authentication credentials for REST request [/_bulk]" -Batch Processor[elasticsearch-logger] exceeded the max_retry_count +failed to process entries: elasticsearch server returned status: 401 @@ -416,6 +425,18 @@ passed end) http.request_uri = function(self, uri, params) + if params.method == "GET" then + return { + status = 200, + body = [[ + { + "version": { + "number": "8.10.2" + } + } + ]] + } + end if not params.body or type(params.body) ~= "string" then return nil, "invalid params body" end @@ -637,6 +658,18 @@ passed end) http.request_uri = function(self, uri, params) + if params.method == "GET" then + return { + status = 200, + body = [[ + { + "version": { + "number": "8.10.2" + } + } + ]] + } + end if not params.body or type(params.body) ~= "string" then return nil, "invalid params body" end @@ -744,6 +777,10 @@ Action/metadata line [1] contains an unknown parameter [_type] field = { index = "services" }, + auth = { + username = "elastic", + password = "123456" + }, batch_max_size = 1, inactive_timeout = 1, include_req_body = true @@ -784,6 +821,10 @@ Action/metadata line [1] contains an unknown parameter [_type] field = { index = "services" }, + auth = { + username = "elastic", + password = "123456" + }, batch_max_size = 1, inactive_timeout = 1, include_req_body = true, @@ -801,3 +842,153 @@ Action/metadata line [1] contains an unknown parameter [_type] } --- error_log "body":"hello world\n" + + + +=== TEST 21: set route (auth) - check compat with version 9 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, { + uri = "/hello", + upstream = { + type = "roundrobin", + nodes = { + ["127.0.0.1:1980"] = 1 + } + }, + plugins = { + ["elasticsearch-logger"] = { + endpoint_addr = "http://127.0.0.1:9301", + field = { + index = "services" + }, + auth = { + username = "elastic", + password = "123456" + }, + batch_max_size = 1, + inactive_timeout = 1 + } + } + }) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 22: test route (auth success) +--- request +GET /hello +--- wait: 2 +--- response_body +hello world +--- error_log +Batch Processor[elasticsearch-logger] successfully processed the entries + + + +=== TEST 23: set route (auth) - check compat with version 7 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, { + uri = "/hello", + upstream = { + type = "roundrobin", + nodes = { + ["127.0.0.1:1980"] = 1 + } + }, + plugins = { + ["elasticsearch-logger"] = { + endpoint_addr = "http://127.0.0.1:9401", + field = { + index = "services" + }, + auth = { + username = "elastic", + password = "123456" + }, + batch_max_size = 1, + inactive_timeout = 1 + } + } + }) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 24: test route (auth success) +--- request +GET /hello +--- wait: 2 +--- response_body +hello world +--- error_log +Batch Processor[elasticsearch-logger] successfully processed the entries + + + +=== TEST 25: set route (auth) - check compat with version 6 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, { + uri = "/hello", + upstream = { + type = "roundrobin", + nodes = { + ["127.0.0.1:1980"] = 1 + } + }, + plugins = { + ["elasticsearch-logger"] = { + endpoint_addr = "http://127.0.0.1:9501", + field = { + index = "services" + }, + auth = { + username = "elastic", + password = "123456" + }, + batch_max_size = 1, + inactive_timeout = 1 + } + } + }) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 26: test route (auth success) +--- request +GET /hello +--- wait: 2 +--- response_body +hello world +--- error_log +Batch Processor[elasticsearch-logger] successfully processed the entries