This is an automated email from the ASF dual-hosted git repository.

membphis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 98bb593  feature: implement `ewma` balancer for upstream node (#2001)
98bb593 is described below

commit 98bb5933d64229ddf8e3dd0fdf1c31f90072c2cb
Author: redynasc <ifonly....@gmail.com>
AuthorDate: Sat Aug 29 23:14:16 2020 +0800

    feature: implement `ewma` balancer for upstream node (#2001)
    
    ewma is a different balancing implementation that will generate a weight 
for every backend IP based on the last server response time, basically it tries 
to dispatch more requests to the backends that reply faster, supposing that 
they are less loaded.
    
    fix #1996
---
 LICENSE                  |  11 +++
 apisix/balancer.lua      |   4 +-
 apisix/balancer/ewma.lua | 191 +++++++++++++++++++++++++++++++++++++++++
 apisix/init.lua          |   4 +
 apisix/schema_def.lua    |   2 +-
 bin/apisix               |   3 +
 doc/admin-api.md         |   2 +-
 t/APISIX.pm              |   3 +
 t/lib/server.lua         |  10 +++
 t/node/ewma.t            | 217 +++++++++++++++++++++++++++++++++++++++++++++++
 10 files changed, 443 insertions(+), 4 deletions(-)

diff --git a/LICENSE b/LICENSE
index 261eeb9..2351cfd 100644
--- a/LICENSE
+++ b/LICENSE
@@ -199,3 +199,14 @@
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+=======================================================================
+Apache ApiSix Subcomponents:
+
+The Apache ApiSix project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
+   ewma.lua file from kubernetes/ingress-nginx: 
https://github.com/kubernetes/ingress-nginx Apache 2.0
+
diff --git a/apisix/balancer.lua b/apisix/balancer.lua
index 36f4f32..cfe53b9 100644
--- a/apisix/balancer.lua
+++ b/apisix/balancer.lua
@@ -30,9 +30,9 @@ local module_name = "balancer"
 local pickers = {
     roundrobin = require("apisix.balancer.roundrobin"),
     chash = require("apisix.balancer.chash"),
+    ewma = require("apisix.balancer.ewma")
 }
 
-
 local lrucache_server_picker = core.lrucache.new({
     ttl = 300, count = 256
 })
@@ -245,7 +245,7 @@ local function pick_server(route, ctx)
         core.log.error("failed to parse server addr: ", server, " err: ", err)
         return core.response.exit(502)
     end
-
+    ctx.server_picker = server_picker
     return res
 end
 
diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
new file mode 100644
index 0000000..ba429d9
--- /dev/null
+++ b/apisix/balancer/ewma.lua
@@ -0,0 +1,191 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+
+local core = require("apisix.core")
+local ngx = ngx
+local ngx_shared = ngx.shared
+local ngx_now = ngx.now
+local math = math
+local pairs = pairs
+local next = next
+local tonumber = tonumber
+
+local _M = {}
+local DECAY_TIME = 10 -- this value is in seconds
+
+local shm_ewma = ngx_shared.balancer_ewma
+local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
+
+local lrucache_addr = core.lrucache.new({
+    ttl = 300, count = 1024
+})
+local lrucache_trans_format = core.lrucache.new({
+    ttl = 300, count = 256
+})
+
+
+local function decay_ewma(ewma, last_touched_at, rtt, now)
+    local td = now - last_touched_at
+    td = math.max(td, 0)
+    local weight = math.exp(-td / DECAY_TIME)
+
+    ewma = ewma * weight + rtt * (1.0 - weight)
+    return ewma
+end
+
+
+local function store_stats(upstream, ewma, now)
+    local success, err, forcible = shm_last_touched_at:set(upstream, now)
+    if not success then
+        core.log.error("balancer_ewma_last_touched_at:set failed ", err)
+    end
+    if forcible then
+        core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly 
overwritten")
+    end
+
+    success, err, forcible = shm_ewma:set(upstream, ewma)
+    if not success then
+        core.log.error("balancer_ewma:set failed ", err)
+    end
+    if forcible then
+        core.log.warn("balancer_ewma:set valid items forcibly overwritten")
+    end
+end
+
+
+local function get_or_update_ewma(upstream, rtt, update)
+    local ewma = shm_ewma:get(upstream) or 0
+    local now = ngx_now()
+    local last_touched_at = shm_last_touched_at:get(upstream) or 0
+    ewma = decay_ewma(ewma, last_touched_at, rtt, now)
+
+    if not update then
+        return ewma
+    end
+
+    store_stats(upstream, ewma, now)
+
+    return ewma
+end
+
+
+local function score(upstream)
+    -- Original implementation used names
+    -- Endpoints don't have names, so passing in host:Port as key instead
+    local upstream_name = upstream.host .. ":" .. upstream.port
+    return get_or_update_ewma(upstream_name, 0, false)
+end
+
+
+local function pick_and_score(peers)
+    local lowest_score_index = 1
+    local lowest_score = score(peers[lowest_score_index])
+    for i = 2, #peers do
+        local new_score = score(peers[i])
+        if new_score < lowest_score then
+            lowest_score_index, lowest_score = i, new_score
+        end
+    end
+
+    return peers[lowest_score_index], lowest_score
+end
+
+
+local function parse_addr(addr)
+    local host, port, err = core.utils.parse_addr(addr)
+    return {host = host, port = port}, err
+end
+
+
+local function _trans_format(up_nodes)
+    -- trans
+    --{"1.2.3.4:80":100,"5.6.7.8:8080":100}
+    -- into
+    -- [{"host":"1.2.3.4","port":"80"},{"host":"5.6.7.8","port":"8080"}]
+    local peers = {}
+    local res, err
+
+    for addr, _ in pairs(up_nodes) do
+        res, err = lrucache_addr(addr, nil, parse_addr, addr)
+        if not err then
+            core.table.insert(peers, res)
+        else
+            core.log.error('parse_addr error: ', addr, err)
+        end
+    end
+
+    return next(peers) and peers or nil
+end
+
+
+local function _ewma_find(ctx, up_nodes)
+    local peers
+    local endpoint
+
+    if not up_nodes
+       or core.table.nkeys(up_nodes) == 0 then
+        return nil, 'up_nodes empty'
+    end
+
+    peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version,
+                                  _trans_format, up_nodes)
+    if not peers then
+        return nil, 'up_nodes trans error'
+    end
+
+    if #peers > 1 then
+        endpoint = pick_and_score(peers)
+    else
+        endpoint = peers[1]
+    end
+
+    return endpoint.host .. ":" .. endpoint.port
+end
+
+
+local function _ewma_after_balance(ctx)
+    local response_time = tonumber(ctx.var.upstream_response_time) or 0
+    local connect_time = tonumber(ctx.var.upstream_connect_time) or 0
+    local rtt = connect_time + response_time
+    local upstream = ctx.var.upstream_addr
+
+    if not upstream then
+        return nil, "no upstream addr found"
+    end
+
+    return get_or_update_ewma(upstream, rtt, true)
+end
+
+
+function _M.new(up_nodes, upstream)
+    if not shm_ewma
+       or not shm_last_touched_at then
+        return nil, "dictionary not find"
+    end
+
+    return {
+        upstream = upstream,
+        get = function (ctx)
+            return _ewma_find(ctx, up_nodes)
+        end,
+        after_balance = _ewma_after_balance
+    }
+end
+
+
+return _M
diff --git a/apisix/init.lua b/apisix/init.lua
index fddc0af..3a69534 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -651,6 +651,10 @@ function _M.http_log_phase()
     local api_ctx = common_phase("log")
     healcheck_passive(api_ctx)
 
+    if api_ctx.server_picker and api_ctx.server_picker.after_balance then
+        api_ctx.server_picker.after_balance(api_ctx)
+    end
+
     if api_ctx.uri_parse_param then
         core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param)
     end
diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua
index ef234c0..a6656b2 100644
--- a/apisix/schema_def.lua
+++ b/apisix/schema_def.lua
@@ -311,7 +311,7 @@ local upstream_schema = {
         type = {
             description = "algorithms of load balancing",
             type = "string",
-            enum = {"chash", "roundrobin"}
+            enum = {"chash", "roundrobin", "ewma"}
         },
         checks = health_checker,
         hash_on = {
diff --git a/bin/apisix b/bin/apisix
index 4d53ddb..9573f3b 100755
--- a/bin/apisix
+++ b/bin/apisix
@@ -181,6 +181,9 @@ http {
     lua_shared_dict worker-events        10m;
     lua_shared_dict lrucache-lock        10m;
     lua_shared_dict skywalking-tracing-buffer    100m;
+    lua_shared_dict balancer_ewma        10m;
+    lua_shared_dict balancer_ewma_locks  10m;
+    lua_shared_dict balancer_ewma_last_touched_at 10m;
 
     # for openid-connect plugin
     lua_shared_dict discovery             1m; # cache for discovery metadata 
documents
diff --git a/doc/admin-api.md b/doc/admin-api.md
index 9a3def9..3b8ac89 100644
--- a/doc/admin-api.md
+++ b/doc/admin-api.md
@@ -493,7 +493,7 @@ In addition to the basic complex equalization algorithm 
selection, APISIX's Upst
 
 |Name    |Optional|Description|
 |-------         |-----|------|
-|type            |required|`roundrobin` supports the weight of the load, 
`chash` consistency hash, pick one of them.|
+|type            |required|`roundrobin` supports the weight of the load, 
`chash` consistency hash,`ewma` minimum latency ,pick one of them.see 
https://en.wikipedia.org/wiki/EWMA_chart for details|
 |nodes           |required if `k8s_deployment_info` not configured|Hash table, 
the key of the internal element is the upstream machine address list, the 
format is `Address + Port`, where the address part can be IP or domain name, 
such as `192.168.1.100:80`, `foo.com:80`, etc. Value is the weight of the node. 
In particular, when the weight value is `0`, it has a special meaning, which 
usually means that the upstream node is invalid and never wants to be selected.|
 |k8s_deployment_info|required if `nodes` not configured|fields: 
`namespace`、`deploy_name`、`service_name`、`port`、`backend_type`, `port` is 
number, `backend_type` is `pod` or `service`, others is string. |
 |hash_on         |optional|This option is only valid if the `type` is `chash`. 
Supported types `vars`(Nginx variables), `header`(custom header), `cookie`, 
`consumer`, the default value is `vars`.|
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 116ac98..7434d43 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -220,6 +220,9 @@ _EOC_
     lua_shared_dict worker-events        10m;
     lua_shared_dict lrucache-lock        10m;
     lua_shared_dict skywalking-tracing-buffer    100m;
+    lua_shared_dict balancer_ewma         1m;
+    lua_shared_dict balancer_ewma_locks   1m;
+    lua_shared_dict balancer_ewma_last_touched_at  1m;
 
     resolver $dns_addrs_str;
     resolver_timeout 5;
diff --git a/t/lib/server.lua b/t/lib/server.lua
index 864a8e2..b9c8db4 100644
--- a/t/lib/server.lua
+++ b/t/lib/server.lua
@@ -68,6 +68,16 @@ function _M.sleep1()
     ngx.say("ok")
 end
 
+function _M.ewma()
+    if ngx.var.server_port == "1981"
+       or ngx.var.server_port == "1982" then
+        ngx.sleep(0.2)
+    else
+        ngx.sleep(0.1)
+    end
+    ngx.print(ngx.var.server_port)
+end
+
 function _M.uri()
     -- ngx.sleep(1)
     ngx.say("uri: ", ngx.var.uri)
diff --git a/t/node/ewma.t b/t/node/ewma.t
new file mode 100644
index 0000000..666ffec
--- /dev/null
+++ b/t/node/ewma.t
@@ -0,0 +1,217 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+#no_long_string();
+no_root_location();
+log_level('info');
+run_tests;
+
+__DATA__
+
+=== TEST 1: add upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 100,
+                                "127.0.0.1:1981": 100
+                            },
+                            "type": "ewma"
+                        },
+                        "uri": "/ewma"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1980": 100,
+                                    "127.0.0.1:1981": 100
+                                },
+                                "type": "ewma"
+                            },
+                            "uri": "/ewma"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 2: about latency
+--- timeout: 5
+--- config
+    location /t {
+        content_by_lua_block {
+            --node: "127.0.0.1:1980": latency is  0.001
+            --node: "127.0.0.1:1981": latency is  0.005
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+                        .. "/ewma"
+
+            local ports_count = {}
+            for i = 1, 12 do
+                local httpc = http.new()
+                httpc:set_timeout(1000)
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+                if not res then
+                    ngx.say(err)
+                    return
+                end
+
+                ports_count[res.body] = (ports_count[res.body] or 0) + 1
+            end
+
+            local ports_arr = {}
+            for port, count in pairs(ports_count) do
+                table.insert(ports_arr, {port = port, count = count})
+            end
+
+            local function cmd(a, b)
+                return a.port > b.port
+            end
+            table.sort(ports_arr, cmd)
+
+            ngx.say(require("cjson").encode(ports_arr))
+            ngx.exit(200)
+        }
+    }
+--- request
+GET /t
+--- response_body
+[{"count":1,"port":"1981"},{"count":11,"port":"1980"}]
+--- error_code: 200
+--- no_error_log
+[error]
+
+
+=== TEST 3: about frequency
+--- timeout: 30
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+                        .. "/ewma"
+
+            --node: "127.0.0.1:1980": latency is  0.001
+            --node: "127.0.0.1:1981": latency is  0.005
+            local ports_count = {}
+            for i = 1, 2 do
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+                if not res then
+                    ngx.say(err)
+                    return
+                end
+            end
+
+            --remove the 1981 node,
+            --add the 1982 node
+            --keep two nodes for triggering ewma logic in server_picker 
function of balancer phase
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 100,
+                                "127.0.0.1:1982": 100
+                            },
+                            "type": "ewma"
+                        },
+                        "uri": "/ewma"
+                }]]
+                )
+
+            if code ~= 200 then
+                ngx.say("update route failed")
+                return
+            end
+
+            ngx.sleep(20)
+            --keep the node 1980 hot
+            for i = 1, 12 do
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+                if not res then
+                    ngx.say(err)
+                    return
+                end
+            end
+
+            --recover the 1981 node
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 100,
+                                "127.0.0.1:1981": 100
+                            },
+                            "type": "ewma"
+                        },
+                        "uri": "/ewma"
+                }]]
+                )
+
+            if code ~= 200 then
+                ngx.say("update route failed")
+                return
+            end
+
+            --should select the 1981 node,because it is idle
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri, {method = "GET", keepalive 
= false})
+            if not res then
+                ngx.say(err)
+                return
+            end
+            ngx.say(require("cjson").encode({port = res.body, count = 1}))
+            ngx.exit(200)
+        }
+    }
+--- request
+GET /t
+--- response_body
+{"count":1,"port":"1981"}
+--- error_code: 200
+--- no_error_log
+[error]
+

Reply via email to