This is an automated email from the ASF dual-hosted git repository.

ashishtiwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 78e8fc0de fix: support multiple versions of Elasticsearch logger 
(#12364)
78e8fc0de is described below

commit 78e8fc0debfe6f1efff9de0ae4f98b3d86af1de2
Author: Ashish Tiwari <ashishjaitiwari15112...@gmail.com>
AuthorDate: Thu Jun 26 17:35:25 2025 +0530

    fix: support multiple versions of Elasticsearch logger (#12364)
---
 apisix/plugins/elasticsearch-logger.lua        |  91 +++++++++--
 ci/pod/docker-compose.plugin.yml               |  38 +++++
 docs/en/latest/plugins/elasticsearch-logger.md |  16 +-
 docs/zh/latest/plugins/elasticsearch-logger.md |  16 +-
 t/plugin/elasticsearch-logger.t                | 201 ++++++++++++++++++++++++-
 5 files changed, 321 insertions(+), 41 deletions(-)

diff --git a/apisix/plugins/elasticsearch-logger.lua 
b/apisix/plugins/elasticsearch-logger.lua
index 2b126c434..09dcbd795 100644
--- a/apisix/plugins/elasticsearch-logger.lua
+++ b/apisix/plugins/elasticsearch-logger.lua
@@ -27,7 +27,6 @@ local math_random     = math.random
 local plugin_name = "elasticsearch-logger"
 local batch_processor_manager = bp_manager_mod.new(plugin_name)
 
-
 local schema = {
     type = "object",
     properties = {
@@ -48,7 +47,6 @@ local schema = {
             type = "object",
             properties = {
                 index = { type = "string"},
-                type = { type = "string"}
             },
             required = {"index"}
         },
@@ -124,32 +122,96 @@ function _M.check_schema(conf, schema_type)
     if schema_type == core.schema.TYPE_METADATA then
         return core.schema.check(metadata_schema, conf)
     end
+
     local check = {"endpoint_addrs"}
     core.utils.check_https(check, conf, plugin_name)
     core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name)
-
     return core.schema.check(schema, conf)
 end
 
 
+local function get_es_major_version(uri, conf)
+    local httpc = http.new()
+    if not httpc then
+        return nil, "failed to create http client"
+    end
+    local headers = {}
+    if conf.auth then
+        local authorization = "Basic " .. ngx.encode_base64(
+            conf.auth.username .. ":" .. conf.auth.password
+        )
+        headers["Authorization"] = authorization
+    end
+    httpc:set_timeout(conf.timeout * 1000)
+    local res, err = httpc:request_uri(uri, {
+        ssl_verify = conf.ssl_verify,
+        method = "GET",
+        headers = headers,
+    })
+    if not res then
+        return false, err
+    end
+    if res.status ~= 200 then
+        return nil, str_format("server returned status: %d, body: %s",
+            res.status, res.body or "")
+    end
+    local json_body, err = core.json.decode(res.body)
+    if not json_body then
+        return nil, "failed to decode response body: " .. err
+    end
+    if not json_body.version or not json_body.version.number then
+        return nil, "failed to get version from response body"
+    end
+
+    local major_version = json_body.version.number:match("^(%d+)%.")
+    if not major_version then
+        return nil, "invalid version format: " .. json_body.version.number
+    end
+
+    return major_version
+end
+
+
 local function get_logger_entry(conf, ctx)
     local entry = log_util.get_log_entry(plugin_name, conf, ctx)
-    return core.json.encode({
-            create = {
-                _index = conf.field.index,
-                _type = conf.field.type
-            }
-        }) .. "\n" ..
+    local body = {
+        index = {
+            _index = conf.field.index
+        }
+    }
+    -- for older version type is required
+    if conf._version == "6" or conf._version == "5" then
+        body.index._type = "_doc"
+    end
+    return core.json.encode(body) .. "\n" ..
         core.json.encode(entry) .. "\n"
 end
 
+local function fetch_and_update_es_version(conf)
+    if conf._version then
+        return
+    end
+    local selected_endpoint_addr
+    if conf.endpoint_addr then
+        selected_endpoint_addr = conf.endpoint_addr
+    else
+        selected_endpoint_addr = 
conf.endpoint_addrs[math_random(#conf.endpoint_addrs)]
+    end
+    local major_version, err = get_es_major_version(selected_endpoint_addr, 
conf)
+    if err then
+        core.log.error("failed to get Elasticsearch version: ", err)
+        return
+    end
+    conf._version = major_version
+end
+
 
 local function send_to_elasticsearch(conf, entries)
     local httpc, err = http.new()
     if not httpc then
         return false, str_format("create http error: %s", err)
     end
-
+    fetch_and_update_es_version(conf)
     local selected_endpoint_addr
     if conf.endpoint_addr then
         selected_endpoint_addr = conf.endpoint_addr
@@ -159,8 +221,8 @@ local function send_to_elasticsearch(conf, entries)
     local uri = selected_endpoint_addr .. "/_bulk"
     local body = core.table.concat(entries, "")
     local headers = {
-        ["Content-Type"] = "application/x-ndjson;compatible-with=7",
-        ["Accept"] = "application/vnd.elasticsearch+json;compatible-with=7"
+        ["Content-Type"] = "application/x-ndjson",
+        ["Accept"] = "application/vnd.elasticsearch+json"
     }
     if conf.auth then
         local authorization = "Basic " .. ngx.encode_base64(
@@ -195,6 +257,11 @@ function _M.body_filter(conf, ctx)
     log_util.collect_body(conf, ctx)
 end
 
+function _M.access(conf)
+    -- fetch_and_update_es_version will call ES server only the first time
+    -- so this should not amount to considerable overhead
+    fetch_and_update_es_version(conf)
+end
 
 function _M.log(conf, ctx)
     local entry = get_logger_entry(conf, ctx)
diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml
index 55f2b443c..c0a598c80 100644
--- a/ci/pod/docker-compose.plugin.yml
+++ b/ci/pod/docker-compose.plugin.yml
@@ -225,6 +225,44 @@ services:
       http.port: 9201
       xpack.security.enabled: 'true'
 
+  elasticsearch-auth-2:
+    image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2
+    restart: unless-stopped
+    ports:
+      - "9301:9201"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      ELASTIC_USERNAME: elastic
+      ELASTIC_PASSWORD: 123456
+      http.port: 9201
+      xpack.security.enabled: 'true'
+
+  elasticsearch-auth-3:
+    image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0
+    restart: unless-stopped
+    ports:
+      - "9401:9201"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      ELASTIC_USERNAME: elastic
+      ELASTIC_PASSWORD: 123456
+      http.port: 9201
+      xpack.security.enabled: 'true'
+
+  elasticsearch-auth-4:
+    image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0
+    restart: unless-stopped
+    ports:
+      - "9501:9201"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      ELASTIC_USERNAME: elastic
+      ELASTIC_PASSWORD: 123456
+      http.port: 9201
+      xpack.security.enabled: 'true'
 
   # The function services of OpenFunction
   test-header:
diff --git a/docs/en/latest/plugins/elasticsearch-logger.md 
b/docs/en/latest/plugins/elasticsearch-logger.md
index c0af4b159..80ea07601 100644
--- a/docs/en/latest/plugins/elasticsearch-logger.md
+++ b/docs/en/latest/plugins/elasticsearch-logger.md
@@ -42,7 +42,6 @@ The `elasticsearch-logger` Plugin pushes request and response 
logs in batches to
 | endpoint_addrs  | array[string] | True     |                             | 
Elasticsearch API endpoint addresses. If multiple endpoints are configured, 
they will be written randomly.            |
 | field         | object   | True     |                             | 
Elasticsearch `field` configuration.                          |
 | field.index   | string  | True     |                             | 
Elasticsearch [_index 
field](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field).
 |
-| field.type    | string  | False    | Elasticsearch default value | 
Elasticsearch [_type 
field](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/mapping-type-field.html#mapping-type-field).
 |
 | log_format | object | False    |                             | Custom log 
format in key-value pairs in JSON format. Support 
[APISIX](../apisix-variable.md) or [NGINX 
variables](http://nginx.org/en/docs/varindex.html) in values. |
 | auth          | array   | False    |                             | 
Elasticsearch 
[authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html)
 configuration. |
 | auth.username | string  | True     |                             | 
Elasticsearch 
[authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html)
 username. |
@@ -110,7 +109,7 @@ admin_key=$(yq '.deployment.admin.admin_key[0].key' 
conf/config.yaml | sed 's/"/
 
 The following example demonstrates how you can enable the 
`elasticsearch-logger` Plugin on a route, which logs client requests and 
responses to the Route and pushes logs to Elasticsearch.
 
-Create a Route with `elasticsearch-logger` to configure the `index` field as 
`gateway` and the `type` field as `logs`:
+Create a Route with `elasticsearch-logger` to configure the `index` field as 
`gateway`:
 
 ```shell
 curl "http://127.0.0.1:9180/apisix/admin/routes"; -X PUT \
@@ -122,8 +121,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes"; -X PUT \
       "elasticsearch-logger": {
         "endpoint_addrs": ["http://elasticsearch:9200";],
         "field": {
-          "index": "gateway",
-          "type": "logs"
+          "index": "gateway"
         }
       }
     },
@@ -149,7 +147,6 @@ Navigate to the Kibana dashboard on 
[localhost:5601](http://localhost:5601) and
 ```json
 {
   "_index": "gateway",
-  "_type": "logs",
   "_id": "CE-JL5QBOkdYRG7kEjTJ",
   "_version": 1,
   "_score": 1,
@@ -215,8 +212,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes"; -X PUT \
       "elasticsearch-logger": {
         "endpoint_addrs": ["http://elasticsearch:9200";],
         "field": {
-          "index": "gateway",
-          "type": "logs"
+          "index": "gateway"
       }
     },
     "upstream": {
@@ -257,7 +253,6 @@ Navigate to the Kibana dashboard on 
[localhost:5601](http://localhost:5601) and
 ```json
 {
   "_index": "gateway",
-  "_type": "logs",
   "_id": "Ck-WL5QBOkdYRG7kODS0",
   "_version": 1,
   "_score": 1,
@@ -288,8 +283,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes"; -X PUT \
       "elasticsearch-logger": {
         "endpoint_addrs": ["http://elasticsearch:9200";],
         "field": {
-          "index": "gateway",
-          "type": "logs"
+          "index": "gateway"
         },
         "include_req_body": true,
         "include_req_body_expr": [["arg_log_body", "==", "yes"]]
@@ -319,7 +313,6 @@ Navigate to the Kibana dashboard on 
[localhost:5601](http://localhost:5601) and
 ```json
 {
   "_index": "gateway",
-  "_type": "logs",
   "_id": "Dk-cL5QBOkdYRG7k7DSW",
   "_version": 1,
   "_score": 1,
@@ -384,7 +377,6 @@ Navigate to the Kibana dashboard __Discover__ tab and you 
should see a log gener
 ```json
 {
   "_index": "gateway",
-  "_type": "logs",
   "_id": "EU-eL5QBOkdYRG7kUDST",
   "_version": 1,
   "_score": 1,
diff --git a/docs/zh/latest/plugins/elasticsearch-logger.md 
b/docs/zh/latest/plugins/elasticsearch-logger.md
index b4332fe59..362b57959 100644
--- a/docs/zh/latest/plugins/elasticsearch-logger.md
+++ b/docs/zh/latest/plugins/elasticsearch-logger.md
@@ -43,7 +43,6 @@ description: elasticsearch-logger Plugin 将请求和响应日志批量推送到
 | endup_addrs | array[string] | 是 | | Elasticsearch API 端点地址。如果配置了多个端点,则会随机写入。 
|
 | field | object | 是 | | Elasticsearch `field` 配置。 |
 | field.index | string | 是 | | Elasticsearch [_index 
字段](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field)。
 |
-| field.type | string | 否 | Elasticsearch 默认值 | Elasticsearch [_type 
字段](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/mapping-type-field.html#mapping-type-field)。
 |
 | log_format | object | 否 | | JSON 格式的键值对中的自定义日志格式。值中支持 
[APISIX](../apisix-variable.md) 或 [NGINX 
变量](http://nginx.org/en/docs/varindex.html)。 |
 | auth | array | 否 | | Elasticsearch 
[身份验证](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html)
 配置。 |
 | auth.username | string | 是 | | Elasticsearch 
[身份验证](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html)
 用户名​​。 |
@@ -111,7 +110,7 @@ admin_key=$(yq '.deployment.admin.admin_key[0].key' 
conf/config.yaml | sed 's/"/
 
 以下示例演示如何在路由上启用 `elasticsearch-logger` 插件,该插件记录客户端对路由的请求和响应,并将日志推送到 
Elasticsearch。
 
-使用 `elasticsearch-logger` 创建路由,将 `index` 字段配置为 `gateway`,将 `type` 字段配置为 `logs`:
+使用 `elasticsearch-logger` 创建路由,将 `index` 字段配置为 `gateway`:
 
 ```shell
 curl "http://127.0.0.1:9180/apisix/admin/routes"; -X PUT \
@@ -123,8 +122,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes"; -X PUT \
       "elasticsearch-logger": {
         "endpoint_addrs": ["http://elasticsearch:9200";],
         "field": {
-          "index": "gateway",
-          "type": "logs"
+          "index": "gateway"
         }
       }
     },
@@ -150,7 +148,6 @@ curl -i "http://127.0.0.1:9080/anything";
 ```json
 {
   "_index": "gateway",
-  "_type": "logs",
   "_id": "CE-JL5QBOkdYRG7kEjTJ",
   "_version": 1,
   "_score": 1,
@@ -216,8 +213,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes"; -X PUT \
       "elasticsearch-logger": {
         "endpoint_addrs": ["http://elasticsearch:9200";],
         "field": {
-          "index": "gateway",
-          "type": "logs"
+          "index": "gateway"
       }
     },
     "upstream": {
@@ -258,7 +254,6 @@ curl -i "http://127.0.0.1:9080/anything"; -H "env: dev"
 ```json
 {
   "_index": "gateway",
-  "_type": "logs",
   "_id": "Ck-WL5QBOkdYRG7kODS0",
   "_version": 1,
   "_score": 1,
@@ -289,8 +284,7 @@ curl "http://127.0.0.1:9180/apisix/admin/routes"; -X PUT \
       "elasticsearch-logger": {
         "endpoint_addrs": ["http://elasticsearch:9200";],
         "field": {
-          "index": "gateway",
-          "type": "logs"
+          "index": "gateway"
         },
         "include_req_body": true,
         "include_req_body_expr": [["arg_log_body", "==", "yes"]]
@@ -320,7 +314,6 @@ curl -i "http://127.0.0.1:9080/anything?log_body=yes"; -X 
POST -d '{"env": "dev"}
 ```json
 {
   "_index": "gateway",
-  "_type": "logs",
   "_id": "Dk-cL5QBOkdYRG7k7DSW",
   "_version": 1,
   "_score": 1,
@@ -385,7 +378,6 @@ curl -i "http://127.0.0.1:9080/anything"; -X POST -d 
'{"env": "dev"}'
 ```json
 {
   "_index": "gateway",
-  "_type": "logs",
   "_id": "EU-eL5QBOkdYRG7kUDST",
   "_version": 1,
   "_score": 1,
diff --git a/t/plugin/elasticsearch-logger.t b/t/plugin/elasticsearch-logger.t
index 83960fbac..7b081032a 100644
--- a/t/plugin/elasticsearch-logger.t
+++ b/t/plugin/elasticsearch-logger.t
@@ -46,8 +46,7 @@ __DATA__
                 {
                     endpoint_addr = "http://127.0.0.1:9200";,
                     field = {
-                        index = "services",
-                        type = "collector"
+                        index = "services"
                     },
                     auth = {
                         username = "elastic",
@@ -165,6 +164,18 @@ passed
     end)
 
     http.request_uri = function(self, uri, params)
+        if params.method == "GET" then
+            return {
+                status = 200,
+                body = [[
+                {
+                    "version": {
+                        "number": "8.10.2"
+                    }
+                }
+                ]]
+            }
+        end
         if not params.body or type(params.body) ~= "string" then
             return nil, "invalid params body"
         end
@@ -293,9 +304,7 @@ GET /hello
 --- response_body
 hello world
 --- error_log
-Batch Processor[elasticsearch-logger] failed to process entries: elasticsearch 
server returned status: 401
-"reason":"missing authentication credentials for REST request [/_bulk]"
-Batch Processor[elasticsearch-logger] exceeded the max_retry_count
+failed to process entries: elasticsearch server returned status: 401
 
 
 
@@ -416,6 +425,18 @@ passed
     end)
 
     http.request_uri = function(self, uri, params)
+        if params.method == "GET" then
+            return {
+                status = 200,
+                body = [[
+                {
+                    "version": {
+                        "number": "8.10.2"
+                    }
+                }
+                ]]
+            }
+        end
         if not params.body or type(params.body) ~= "string" then
             return nil, "invalid params body"
         end
@@ -637,6 +658,18 @@ passed
     end)
 
     http.request_uri = function(self, uri, params)
+        if params.method == "GET" then
+            return {
+                status = 200,
+                body = [[
+                {
+                    "version": {
+                        "number": "8.10.2"
+                    }
+                }
+                ]]
+            }
+        end
         if not params.body or type(params.body) ~= "string" then
             return nil, "invalid params body"
         end
@@ -744,6 +777,10 @@ Action/metadata line [1] contains an unknown parameter 
[_type]
                         field = {
                             index = "services"
                         },
+                        auth = {
+                            username = "elastic",
+                            password = "123456"
+                        },
                         batch_max_size = 1,
                         inactive_timeout = 1,
                         include_req_body = true
@@ -784,6 +821,10 @@ Action/metadata line [1] contains an unknown parameter 
[_type]
                         field = {
                             index = "services"
                         },
+                        auth = {
+                            username = "elastic",
+                            password = "123456"
+                        },
                         batch_max_size = 1,
                         inactive_timeout = 1,
                         include_req_body = true,
@@ -801,3 +842,153 @@ Action/metadata line [1] contains an unknown parameter 
[_type]
     }
 --- error_log
 "body":"hello world\n"
+
+
+
+=== TEST 21: set route (auth) - check compat with version 9
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
+                uri = "/hello",
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                },
+                plugins = {
+                    ["elasticsearch-logger"] = {
+                        endpoint_addr = "http://127.0.0.1:9301";,
+                        field = {
+                            index = "services"
+                        },
+                        auth = {
+                            username = "elastic",
+                            password = "123456"
+                        },
+                        batch_max_size = 1,
+                        inactive_timeout = 1
+                    }
+                }
+            })
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 22: test route (auth success)
+--- request
+GET /hello
+--- wait: 2
+--- response_body
+hello world
+--- error_log
+Batch Processor[elasticsearch-logger] successfully processed the entries
+
+
+
+=== TEST 23: set route (auth) - check compat with version 7
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
+                uri = "/hello",
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                },
+                plugins = {
+                    ["elasticsearch-logger"] = {
+                        endpoint_addr = "http://127.0.0.1:9401";,
+                        field = {
+                            index = "services"
+                        },
+                        auth = {
+                            username = "elastic",
+                            password = "123456"
+                        },
+                        batch_max_size = 1,
+                        inactive_timeout = 1
+                    }
+                }
+            })
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 24: test route (auth success)
+--- request
+GET /hello
+--- wait: 2
+--- response_body
+hello world
+--- error_log
+Batch Processor[elasticsearch-logger] successfully processed the entries
+
+
+
+=== TEST 25: set route (auth) - check compat with version 6
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
+                uri = "/hello",
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                },
+                plugins = {
+                    ["elasticsearch-logger"] = {
+                        endpoint_addr = "http://127.0.0.1:9501";,
+                        field = {
+                            index = "services"
+                        },
+                        auth = {
+                            username = "elastic",
+                            password = "123456"
+                        },
+                        batch_max_size = 1,
+                        inactive_timeout = 1
+                    }
+                }
+            })
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 26: test route (auth success)
+--- request
+GET /hello
+--- wait: 2
+--- response_body
+hello world
+--- error_log
+Batch Processor[elasticsearch-logger] successfully processed the entries

Reply via email to