nic-6443 commented on code in PR #13230: URL: https://github.com/apache/apisix/pull/13230#discussion_r3090726507
########## apisix/discovery/consul/client.lua: ########## @@ -0,0 +1,479 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Reusable HTTP client primitives for Consul service discovery. +--- Extracted from init.lua so that both static-config mode and +--- dynamic-config mode can share the same core logic. + +local require = require +local core = require("apisix.core") +local core_sleep = require("apisix.core.utils").sleep +local resty_consul = require('resty.consul') +local http = require('resty.http') +local ipairs = ipairs +local pairs = pairs +local unpack = unpack +local tonumber = tonumber +local type = type +local next = next +local ngx = ngx +local math_random = math.random +local log = core.log +local json_delay_encode = core.json.delay_encode +local pcall = pcall +local null = ngx.null + +local _M = {} + +local default_random_range = 5 +local default_catalog_error_index = -1 +local default_health_error_index = -2 +local watch_type_catalog = 1 +local watch_type_health = 2 +local max_retry_time = 256 + + +-- ─── helpers ────────────────────────────────────────────────────────── + +function _M.get_retry_delay(retry_delay) + if not retry_delay or retry_delay >= max_retry_time then + retry_delay = 1 + else + retry_delay = retry_delay * 4 + end + + return retry_delay +end + + +local function is_not_empty(value) + if value == nil or value == null + or (type(value) == "table" and not next(value)) + or (type(value) == "string" and value == "") + then + return false + end + + return true +end + + +-- ─── sort comparators ───────────────────────────────────────────────── + +local function combine_sort_nodes_cmp(left, right) + if left.host ~= right.host then + return left.host < right.host + end + + return left.port < right.port +end + + +local function port_sort_nodes_cmp(left, right) + return left.port < right.port +end + + +local function host_sort_nodes_cmp(left, right) + return left.host < right.host +end + + +function _M.sort_nodes(nodes, sort_type) + if not nodes or not sort_type or sort_type == "origin" then + return + end + + if sort_type == "port_sort" then + core.table.sort(nodes, port_sort_nodes_cmp) + elseif sort_type == "host_sort" then + core.table.sort(nodes, host_sort_nodes_cmp) + elseif sort_type == "combine_sort" then + core.table.sort(nodes, combine_sort_nodes_cmp) + end +end + + +-- ─── resty.consul options ───────────────────────────────────────────── + +local function get_opts(consul_server, is_catalog) + local opts = { + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + default_args = { + token = consul_server.token, + } + } + if not consul_server.keepalive then + return opts + end + + opts.default_args.wait = consul_server.wait_timeout + + if is_catalog then + opts.default_args.index = consul_server.catalog_index + else + opts.default_args.index = consul_server.health_index + end + + return opts +end + + +-- ─── blocking query watchers ────────────────────────────────────────── + +function _M.watch_catalog(consul_server) + local client = resty_consul:new(get_opts(consul_server, true)) + + ::RETRY:: + local watch_result, watch_err = client:get(consul_server.consul_watch_catalog_url) + local watch_error_info = (watch_err ~= nil and watch_err) + or ((watch_result ~= nil and watch_result.status ~= 200) + and watch_result.status) + if watch_error_info then + log.error("connect consul: ", consul_server.consul_server_url, + " by sub url: ", consul_server.consul_watch_catalog_url, + ", got watch result: ", json_delay_encode(watch_result), + ", with error: ", watch_error_info) + + return watch_type_catalog, default_catalog_error_index + end + + if consul_server.catalog_index > 0 + and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then + local random_delay = math_random(default_random_range) + log.info("watch catalog has no change, re-watch consul after ", random_delay, " seconds") + core_sleep(random_delay) + goto RETRY + end + + return watch_type_catalog, watch_result.headers['X-Consul-Index'] +end + + +function _M.watch_health(consul_server) + local client = resty_consul:new(get_opts(consul_server, false)) + + ::RETRY:: + local watch_result, watch_err = client:get(consul_server.consul_watch_health_url) + local watch_error_info = (watch_err ~= nil and watch_err) + or ((watch_result ~= nil and watch_result.status ~= 200) + and watch_result.status) + if watch_error_info then + log.error("connect consul: ", consul_server.consul_server_url, + " by sub url: ", consul_server.consul_watch_health_url, + ", got watch result: ", json_delay_encode(watch_result), + ", with error: ", watch_error_info) + + return watch_type_health, default_health_error_index + end + + if consul_server.health_index > 0 + and consul_server.health_index == tonumber(watch_result.headers['X-Consul-Index']) then + local random_delay = math_random(default_random_range) + log.info("watch health has no change, re-watch consul after ", random_delay, " seconds") + core_sleep(random_delay) + goto RETRY + end + + return watch_type_health, watch_result.headers['X-Consul-Index'] +end + + +function _M.watch_result_is_valid(watch_type, index, catalog_index, health_index) + if index <= 0 then + return false + end + + if watch_type == watch_type_catalog then + if index == catalog_index then + return false + end + else + if index == health_index then + return false + end + end + + return true +end + + +function _M.update_index(consul_server, catalog_index, health_index) + local c_index = 0 + local h_index = 0 + if catalog_index ~= nil then + c_index = tonumber(catalog_index) + end + + if health_index ~= nil then + h_index = tonumber(health_index) + end + + if c_index > 0 then + consul_server.catalog_index = c_index + end + + if h_index > 0 then + consul_server.health_index = h_index + end +end + + +-- ─── URL parsing ────────────────────────────────────────────────────── + +function _M.format_consul_params(consul_conf) + local servers = consul_conf.servers + local consul_server_list = core.table.new(0, #servers) + + for _, v in pairs(servers) do + local scheme, host, port, path = unpack(http.parse_uri(nil, v)) + if scheme ~= "http" then + return nil, "only support consul http schema address, eg: http://address:port" + elseif path ~= "/" or core.string.has_suffix(v, '/') then + return nil, "invalid consul server address, the valid format: http://address:port" + end + core.table.insert(consul_server_list, { + host = host, + port = port, + token = consul_conf.token, + connect_timeout = consul_conf.timeout.connect, + read_timeout = consul_conf.timeout.read, + wait_timeout = consul_conf.timeout.wait, + consul_watch_catalog_url = "/catalog/services", + consul_sub_url = "/health/service", + consul_watch_health_url = "/health/state/any", + consul_server_url = v .. "/v1", + weight = consul_conf.weight, + keepalive = consul_conf.keepalive, + health_index = 0, + catalog_index = 0, + fetch_interval = consul_conf.fetch_interval, + }) + end + return consul_server_list, nil +end + + +-- ─── service fetching ───────────────────────────────────────────────── + +--- Fetch all services from a single consul server. +--- Returns: up_services (table of service_name -> nodes), catalog_index, health_index +--- On failure: nil, err_string +--- +--- options: +--- default_weight (number) default node weight +--- sort_type (string) "origin"/"host_sort"/"port_sort"/"combine_sort" +--- skip_service_map (table) set of service names to skip +--- preserve_metadata (bool) include Service.Meta in returned nodes +--- key_builder (function) key_builder(service_name) -> string for the result key +function _M.fetch_services_from_server(consul_server, options) Review Comment: Fixed — updated docstring to document all 4 return values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
