membphis commented on code in PR #12364: URL: https://github.com/apache/apisix/pull/12364#discussion_r2167915543
########## apisix/plugins/elasticsearch-logger.lua: ########## @@ -195,6 +264,11 @@ function _M.body_filter(conf, ctx) log_util.collect_body(conf, ctx) end +function _M.access(conf) + -- set_version will call ES server only the first time + -- so this should not amount to considerable overhead + set_version(conf) Review Comment: how about `fetch_and_update_es_version`? ########## apisix/plugins/elasticsearch-logger.lua: ########## @@ -159,9 +221,16 @@ local function send_to_elasticsearch(conf, entries) local uri = selected_endpoint_addr .. "/_bulk" local body = core.table.concat(entries, "") local headers = { - ["Content-Type"] = "application/x-ndjson;compatible-with=7", - ["Accept"] = "application/vnd.elasticsearch+json;compatible-with=7" + ["Content-Type"] = "application/x-ndjson", + ["Accept"] = "application/vnd.elasticsearch+json" } + if conf._version == "8" then + headers["Content-Type"] = headers["Content-Type"] .. compat_header_7 Review Comment: can we remove `compat_header_7` and `compat_header_8`? If yes, then we can move `if conf._version == "8" then` and `elseif conf._version == "9" then`. just use the default header, pls confirm if it is ok: ``` local headers = { ["Content-Type"] = "application/x-ndjson", ["Accept"] = "application/vnd.elasticsearch+json" } ``` ########## ci/pod/docker-compose.plugin.yml: ########## @@ -225,6 +225,66 @@ services: http.port: 9201 xpack.security.enabled: 'true' + elasticsearch-noauth-2: + image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2 + restart: unless-stopped + ports: + - "9400:9200" + - "9500:9300" + environment: + ES_JAVA_OPTS: -Xms512m -Xmx512m + discovery.type: single-node + xpack.security.enabled: 'false' + + elasticsearch-auth-2: + image: docker.elastic.co/elasticsearch/elasticsearch:9.0.2 + restart: unless-stopped + ports: + - "9301:9201" + environment: + ES_JAVA_OPTS: -Xms512m -Xmx512m + discovery.type: single-node + ELASTIC_USERNAME: elastic + ELASTIC_PASSWORD: 123456 + http.port: 9201 + xpack.security.enabled: 'true' + + elasticsearch-noauth-3: + image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0 + restart: unless-stopped + ports: + - "9600:9200" + - "9700:9300" + environment: + ES_JAVA_OPTS: -Xms512m -Xmx512m + discovery.type: single-node + xpack.security.enabled: 'false' + + elasticsearch-auth-3: + image: docker.elastic.co/elasticsearch/elasticsearch:7.0.0 + restart: unless-stopped + ports: + - "9401:9201" + environment: + ES_JAVA_OPTS: -Xms512m -Xmx512m + discovery.type: single-node + ELASTIC_USERNAME: elastic + ELASTIC_PASSWORD: 123456 + http.port: 9201 + xpack.security.enabled: 'true' + + elasticsearch-auth-4: + image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0 Review Comment: we started more instances of es the github action, is it power enough to support this? I am little worried ########## apisix/plugins/elasticsearch-logger.lua: ########## @@ -124,32 +124,94 @@ function _M.check_schema(conf, schema_type) if schema_type == core.schema.TYPE_METADATA then return core.schema.check(metadata_schema, conf) end + local check = {"endpoint_addrs"} core.utils.check_https(check, conf, plugin_name) core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name) - return core.schema.check(schema, conf) end +local function get_es_major_version(uri, conf) + local httpc = http.new() + if not httpc then + return nil, "failed to create http client" + end + local headers = {} + if conf.auth then + local authorization = "Basic " .. ngx.encode_base64( + conf.auth.username .. ":" .. conf.auth.password + ) + headers["Authorization"] = authorization + end + httpc:set_timeout(conf.timeout * 1000) + local res, err = httpc:request_uri(uri, { + ssl_verify = conf.ssl_verify, + method = "GET", + headers = headers, + }) + if not res then + return false, err + end + if res.status ~= 200 then + return nil, str_format("server returned status: %d, body: %s", + res.status, res.body or "") + end + local json_body, err = core.json.decode(res.body) + if not json_body then + return nil, "failed to decode response body: " .. err + end + if not json_body.version or not json_body.version.number then + return nil, "failed to get version from response body" + end + + local major_version = json_body.version.number:match("^(%d+)%.") + if not major_version then + return nil, "invalid version format: " .. json_body.version.number + end + + return major_version +end + + local function get_logger_entry(conf, ctx) local entry = log_util.get_log_entry(plugin_name, conf, ctx) - return core.json.encode({ - create = { - _index = conf.field.index, - _type = conf.field.type - } - }) .. "\n" .. + local body = { + index = { + _index = conf.field.index + } + } + -- for older version type is required + if conf._version == "6" or conf._version == "5" then + body.index._type = "_doc" + end + return core.json.encode(body) .. "\n" .. core.json.encode(entry) .. "\n" end +local function set_version(conf) + if not conf._version then + local selected_endpoint_addr + if conf.endpoint_addr then + selected_endpoint_addr = conf.endpoint_addr + else + selected_endpoint_addr = conf.endpoint_addrs[math_random(#conf.endpoint_addrs)] + end + local major_version, err = get_es_major_version(selected_endpoint_addr, conf) + if err then + return false, str_format("failed to get Elasticsearch version: %s", err) Review Comment: we should write error log here we do not capture the error msg in `access` phase ########## apisix/plugins/elasticsearch-logger.lua: ########## @@ -124,32 +124,94 @@ function _M.check_schema(conf, schema_type) if schema_type == core.schema.TYPE_METADATA then return core.schema.check(metadata_schema, conf) end + local check = {"endpoint_addrs"} core.utils.check_https(check, conf, plugin_name) core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name) - return core.schema.check(schema, conf) end +local function get_es_major_version(uri, conf) + local httpc = http.new() + if not httpc then + return nil, "failed to create http client" + end + local headers = {} + if conf.auth then + local authorization = "Basic " .. ngx.encode_base64( + conf.auth.username .. ":" .. conf.auth.password + ) + headers["Authorization"] = authorization + end + httpc:set_timeout(conf.timeout * 1000) + local res, err = httpc:request_uri(uri, { + ssl_verify = conf.ssl_verify, + method = "GET", + headers = headers, + }) + if not res then + return false, err + end + if res.status ~= 200 then + return nil, str_format("server returned status: %d, body: %s", + res.status, res.body or "") + end + local json_body, err = core.json.decode(res.body) + if not json_body then + return nil, "failed to decode response body: " .. err + end + if not json_body.version or not json_body.version.number then + return nil, "failed to get version from response body" + end + + local major_version = json_body.version.number:match("^(%d+)%.") + if not major_version then + return nil, "invalid version format: " .. json_body.version.number + end + + return major_version +end + + local function get_logger_entry(conf, ctx) local entry = log_util.get_log_entry(plugin_name, conf, ctx) - return core.json.encode({ - create = { - _index = conf.field.index, - _type = conf.field.type - } - }) .. "\n" .. + local body = { + index = { + _index = conf.field.index + } + } + -- for older version type is required + if conf._version == "6" or conf._version == "5" then + body.index._type = "_doc" + end + return core.json.encode(body) .. "\n" .. core.json.encode(entry) .. "\n" end +local function set_version(conf) Review Comment: ```lua local function set_version(conf) if conf._version then return end local selected_endpoint_addr if conf.endpoint_addr then selected_endpoint_addr = conf.endpoint_addr else selected_endpoint_addr = conf.endpoint_addrs[math_random(#conf.endpoint_addrs)] end local major_version, err = get_es_major_version(selected_endpoint_addr, conf) if err then return false, str_format("failed to get Elasticsearch version: %s", err) end conf._version = major_version end ########## apisix/plugins/elasticsearch-logger.lua: ########## @@ -159,9 +221,16 @@ local function send_to_elasticsearch(conf, entries) local uri = selected_endpoint_addr .. "/_bulk" local body = core.table.concat(entries, "") local headers = { - ["Content-Type"] = "application/x-ndjson;compatible-with=7", - ["Accept"] = "application/vnd.elasticsearch+json;compatible-with=7" + ["Content-Type"] = "application/x-ndjson", + ["Accept"] = "application/vnd.elasticsearch+json" } + if conf._version == "8" then + headers["Content-Type"] = headers["Content-Type"] .. compat_header_7 Review Comment: I do not like `if conf._version == "8" then ... elseif conf._version == "9" then ... end` style -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@apisix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org