membphis commented on code in PR #12364:
URL: https://github.com/apache/apisix/pull/12364#discussion_r2167915543


##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -195,6 +264,11 @@ function _M.body_filter(conf, ctx)
     log_util.collect_body(conf, ctx)
 end
 
+function _M.access(conf)
+    -- set_version will call ES server only the first time
+    -- so this should not amount to considerable overhead
+    set_version(conf)

Review Comment:
   how about `fetch_and_update_es_version`?



##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -159,9 +221,16 @@ local function send_to_elasticsearch(conf, entries)
     local uri = selected_endpoint_addr .. "/_bulk"
     local body = core.table.concat(entries, "")
     local headers = {
-        ["Content-Type"] = "application/x-ndjson;compatible-with=7",
-        ["Accept"] = "application/vnd.elasticsearch+json;compatible-with=7"
+        ["Content-Type"] = "application/x-ndjson",
+        ["Accept"] = "application/vnd.elasticsearch+json"
     }
+    if conf._version == "8" then
+        headers["Content-Type"] = headers["Content-Type"] .. compat_header_7

Review Comment:
   can we remove `compat_header_7` and `compat_header_8`?
   
   If yes, then we can move `if conf._version == "8" then` and `elseif 
conf._version == "9" then`.
   
   just use the default header, pls confirm if it is ok:
   
   ```
   local headers = {
           ["Content-Type"] = "application/x-ndjson",
           ["Accept"] = "application/vnd.elasticsearch+json"
   }
   ```



##########
ci/pod/docker-compose.plugin.yml:
##########
@@ -225,6 +225,66 @@ services:
       http.port: 9201
       xpack.security.enabled: 'true'
 
+  elasticsearch-noauth-2:
+    image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2
+    restart: unless-stopped
+    ports:
+      - "9400:9200"
+      - "9500:9300"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      xpack.security.enabled: 'false'
+
+  elasticsearch-auth-2:
+    image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2
+    restart: unless-stopped
+    ports:
+      - "9301:9201"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      ELASTIC_USERNAME: elastic
+      ELASTIC_PASSWORD: 123456
+      http.port: 9201
+      xpack.security.enabled: 'true'
+
+  elasticsearch-noauth-3:
+    image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0
+    restart: unless-stopped
+    ports:
+      - "9600:9200"
+      - "9700:9300"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      xpack.security.enabled: 'false'
+
+  elasticsearch-auth-3:
+    image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0
+    restart: unless-stopped
+    ports:
+      - "9401:9201"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      ELASTIC_USERNAME: elastic
+      ELASTIC_PASSWORD: 123456
+      http.port: 9201
+      xpack.security.enabled: 'true'
+
+  elasticsearch-auth-4:
+    image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0

Review Comment:
   we started more instances of es
   
   the github action, is it power enough to support this? I am little worried



##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -124,32 +124,94 @@ function _M.check_schema(conf, schema_type)
     if schema_type == core.schema.TYPE_METADATA then
         return core.schema.check(metadata_schema, conf)
     end
+
     local check = {"endpoint_addrs"}
     core.utils.check_https(check, conf, plugin_name)
     core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name)
-
     return core.schema.check(schema, conf)
 end
 
 
+local function get_es_major_version(uri, conf)
+    local httpc = http.new()
+    if not httpc then
+        return nil, "failed to create http client"
+    end
+    local headers = {}
+    if conf.auth then
+        local authorization = "Basic " .. ngx.encode_base64(
+            conf.auth.username .. ":" .. conf.auth.password
+        )
+        headers["Authorization"] = authorization
+    end
+    httpc:set_timeout(conf.timeout * 1000)
+    local res, err = httpc:request_uri(uri, {
+        ssl_verify = conf.ssl_verify,
+        method = "GET",
+        headers = headers,
+    })
+    if not res then
+        return false, err
+    end
+    if res.status ~= 200 then
+        return nil, str_format("server returned status: %d, body: %s",
+            res.status, res.body or "")
+    end
+    local json_body, err = core.json.decode(res.body)
+    if not json_body then
+        return nil, "failed to decode response body: " .. err
+    end
+    if not json_body.version or not json_body.version.number then
+        return nil, "failed to get version from response body"
+    end
+
+    local major_version = json_body.version.number:match("^(%d+)%.")
+    if not major_version then
+        return nil, "invalid version format: " .. json_body.version.number
+    end
+
+    return major_version
+end
+
+
 local function get_logger_entry(conf, ctx)
     local entry = log_util.get_log_entry(plugin_name, conf, ctx)
-    return core.json.encode({
-            create = {
-                _index = conf.field.index,
-                _type = conf.field.type
-            }
-        }) .. "\n" ..
+    local body = {
+        index = {
+            _index = conf.field.index
+        }
+    }
+    -- for older version type is required
+    if conf._version == "6" or conf._version == "5" then
+        body.index._type = "_doc"
+    end
+    return core.json.encode(body) .. "\n" ..
         core.json.encode(entry) .. "\n"
 end
 
+local function set_version(conf)
+    if not conf._version then
+        local selected_endpoint_addr
+        if conf.endpoint_addr then
+            selected_endpoint_addr = conf.endpoint_addr
+        else
+            selected_endpoint_addr = 
conf.endpoint_addrs[math_random(#conf.endpoint_addrs)]
+        end
+        local major_version, err = 
get_es_major_version(selected_endpoint_addr, conf)
+        if err then
+            return false, str_format("failed to get Elasticsearch version: 
%s", err)

Review Comment:
   we should write error log here
   
   we do not capture the error msg in `access` phase



##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -124,32 +124,94 @@ function _M.check_schema(conf, schema_type)
     if schema_type == core.schema.TYPE_METADATA then
         return core.schema.check(metadata_schema, conf)
     end
+
     local check = {"endpoint_addrs"}
     core.utils.check_https(check, conf, plugin_name)
     core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name)
-
     return core.schema.check(schema, conf)
 end
 
 
+local function get_es_major_version(uri, conf)
+    local httpc = http.new()
+    if not httpc then
+        return nil, "failed to create http client"
+    end
+    local headers = {}
+    if conf.auth then
+        local authorization = "Basic " .. ngx.encode_base64(
+            conf.auth.username .. ":" .. conf.auth.password
+        )
+        headers["Authorization"] = authorization
+    end
+    httpc:set_timeout(conf.timeout * 1000)
+    local res, err = httpc:request_uri(uri, {
+        ssl_verify = conf.ssl_verify,
+        method = "GET",
+        headers = headers,
+    })
+    if not res then
+        return false, err
+    end
+    if res.status ~= 200 then
+        return nil, str_format("server returned status: %d, body: %s",
+            res.status, res.body or "")
+    end
+    local json_body, err = core.json.decode(res.body)
+    if not json_body then
+        return nil, "failed to decode response body: " .. err
+    end
+    if not json_body.version or not json_body.version.number then
+        return nil, "failed to get version from response body"
+    end
+
+    local major_version = json_body.version.number:match("^(%d+)%.")
+    if not major_version then
+        return nil, "invalid version format: " .. json_body.version.number
+    end
+
+    return major_version
+end
+
+
 local function get_logger_entry(conf, ctx)
     local entry = log_util.get_log_entry(plugin_name, conf, ctx)
-    return core.json.encode({
-            create = {
-                _index = conf.field.index,
-                _type = conf.field.type
-            }
-        }) .. "\n" ..
+    local body = {
+        index = {
+            _index = conf.field.index
+        }
+    }
+    -- for older version type is required
+    if conf._version == "6" or conf._version == "5" then
+        body.index._type = "_doc"
+    end
+    return core.json.encode(body) .. "\n" ..
         core.json.encode(entry) .. "\n"
 end
 
+local function set_version(conf)

Review Comment:
   ```lua
   local function set_version(conf)
       if conf._version then
           return
       end
   
           local selected_endpoint_addr
           if conf.endpoint_addr then
               selected_endpoint_addr = conf.endpoint_addr
           else
               selected_endpoint_addr = 
conf.endpoint_addrs[math_random(#conf.endpoint_addrs)]
           end
           local major_version, err = 
get_es_major_version(selected_endpoint_addr, conf)
           if err then
               return false, str_format("failed to get Elasticsearch version: 
%s", err)
           end
           conf._version = major_version
   end



##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -159,9 +221,16 @@ local function send_to_elasticsearch(conf, entries)
     local uri = selected_endpoint_addr .. "/_bulk"
     local body = core.table.concat(entries, "")
     local headers = {
-        ["Content-Type"] = "application/x-ndjson;compatible-with=7",
-        ["Accept"] = "application/vnd.elasticsearch+json;compatible-with=7"
+        ["Content-Type"] = "application/x-ndjson",
+        ["Accept"] = "application/vnd.elasticsearch+json"
     }
+    if conf._version == "8" then
+        headers["Content-Type"] = headers["Content-Type"] .. compat_header_7

Review Comment:
   I do not like `if conf._version == "8" then ... elseif conf._version == "9" 
then ... end` style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to