This is an automated email from the ASF dual-hosted git repository.

membphis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new ba11e7f  feat: ewma use p2c to improve performance (#3300)
ba11e7f is described below

commit ba11e7fa0a005a6a60cf9360bcbb83c22aa25028
Author: 大可 <hnlq.s...@gmail.com>
AuthorDate: Fri Jan 29 17:17:22 2021 +0800

    feat: ewma use p2c to improve performance (#3300)
---
 apisix/balancer/ewma.lua | 158 +++++++++++++++++++++++++++++---------------
 t/node/ewma.t            | 166 ++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 270 insertions(+), 54 deletions(-)

diff --git a/apisix/balancer/ewma.lua b/apisix/balancer/ewma.lua
index b66b07a..e1b2762 100644
--- a/apisix/balancer/ewma.lua
+++ b/apisix/balancer/ewma.lua
@@ -3,28 +3,50 @@
 -- Inspiration drawn from:
 -- 
https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
 --   
/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
-
 local core = require("apisix.core")
+local resty_lock = require("resty.lock")
+
+local nkeys = core.table.nkeys
+local table_insert = core.table.insert
 local ngx = ngx
 local ngx_shared = ngx.shared
 local ngx_now = ngx.now
 local math = math
 local pairs = pairs
+local ipairs = ipairs
 local next = next
+local error = error
 
-local _M = {}
 local DECAY_TIME = 10 -- this value is in seconds
+local LOCK_KEY = ":ewma_key"
 
 local shm_ewma = ngx_shared.balancer_ewma
-local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
+local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at
+
+local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024})
+local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256})
+
+local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", 
{timeout = 0, exptime = 0.1})
 
-local lrucache_addr = core.lrucache.new({
-    ttl = 300, count = 1024
-})
-local lrucache_trans_format = core.lrucache.new({
-    ttl = 300, count = 256
-})
+local _M = {name = "ewma"}
 
+local function lock(upstream)
+    local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
+    if err and err ~= "timeout" then
+        core.log.error("EWMA Balancer failed to lock: ", err)
+    end
+
+    return err
+end
+
+local function unlock()
+    local ok, err = ewma_lock:unlock()
+    if not ok then
+        core.log.error("EWMA Balancer failed to unlock: ", err)
+    end
+
+    return err
+end
 
 local function decay_ewma(ewma, last_touched_at, rtt, now)
     local td = now - last_touched_at
@@ -35,73 +57,68 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
     return ewma
 end
 
-
 local function store_stats(upstream, ewma, now)
     local success, err, forcible = shm_last_touched_at:set(upstream, now)
     if not success then
-        core.log.error("balancer_ewma_last_touched_at:set failed ", err)
+        core.log.error("shm_last_touched_at:set failed: ", err)
     end
     if forcible then
-        core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly 
overwritten")
+        core.log.warn("shm_last_touched_at:set valid items forcibly 
overwritten")
     end
 
     success, err, forcible = shm_ewma:set(upstream, ewma)
     if not success then
-        core.log.error("balancer_ewma:set failed ", err)
+        core.log.error("shm_ewma:set failed: ", err)
     end
     if forcible then
-        core.log.warn("balancer_ewma:set valid items forcibly overwritten")
+        core.log.warn("shm_ewma:set valid items forcibly overwritten")
     end
 end
 
-
 local function get_or_update_ewma(upstream, rtt, update)
+    if update then
+        local lock_err = lock(upstream)
+        if lock_err ~= nil then
+            return 0, lock_err
+        end
+    end
+
     local ewma = shm_ewma:get(upstream) or 0
+
     local now = ngx_now()
     local last_touched_at = shm_last_touched_at:get(upstream) or 0
     ewma = decay_ewma(ewma, last_touched_at, rtt, now)
 
     if not update then
-        return ewma
+        return ewma, nil
     end
 
     store_stats(upstream, ewma, now)
 
-    return ewma
+    unlock()
+
+    return ewma, nil
 end
 
+local function get_upstream_name(upstream)
+    return upstream.host .. ":" .. upstream.port
+end
 
 local function score(upstream)
     -- Original implementation used names
-    -- Endpoints don't have names, so passing in host:Port as key instead
-    local upstream_name = upstream.host .. ":" .. upstream.port
+    -- Endpoints don't have names, so passing in IP:Port as key instead
+    local upstream_name = get_upstream_name(upstream)
     return get_or_update_ewma(upstream_name, 0, false)
 end
 
-
-local function pick_and_score(peers)
-    local lowest_score_index = 1
-    local lowest_score = score(peers[lowest_score_index])
-    for i = 2, #peers do
-        local new_score = score(peers[i])
-        if new_score < lowest_score then
-            lowest_score_index, lowest_score = i, new_score
-        end
-    end
-
-    return peers[lowest_score_index], lowest_score
-end
-
-
 local function parse_addr(addr)
     local host, port, err = core.utils.parse_addr(addr)
     return {host = host, port = port}, err
 end
 
-
 local function _trans_format(up_nodes)
     -- trans
-    --{"1.2.3.4:80":100,"5.6.7.8:8080":100}
+    -- {"1.2.3.4:80":100,"5.6.7.8:8080":100}
     -- into
     -- [{"host":"1.2.3.4","port":"80"},{"host":"5.6.7.8","port":"8080"}]
     local peers = {}
@@ -119,38 +136,72 @@ local function _trans_format(up_nodes)
     return next(peers) and peers or nil
 end
 
-
 local function _ewma_find(ctx, up_nodes)
     local peers
-    local endpoint
 
-    if not up_nodes
-       or core.table.nkeys(up_nodes) == 0 then
+    if not up_nodes or nkeys(up_nodes) == 0 then
         return nil, 'up_nodes empty'
     end
 
-    peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version,
-                                  _trans_format, up_nodes)
+    if ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == 
nkeys(up_nodes) then
+        return nil, "all upstream servers tried"
+    end
+
+    peers = lrucache_trans_format(ctx.upstream_key, ctx.upstream_version, 
_trans_format, up_nodes)
     if not peers then
         return nil, 'up_nodes trans error'
     end
 
-    if #peers > 1 then
-        endpoint = pick_and_score(peers)
+    local filtered_peers
+    if ctx.balancer_tried_servers then
+        for _, peer in ipairs(peers) do
+            if not ctx.balancer_tried_servers[get_upstream_name(peer)] then
+                if not filtered_peers then
+                    filtered_peers = {}
+                end
+
+                table_insert(filtered_peers, peer)
+            end
+        end
     else
-        endpoint = peers[1]
+        filtered_peers = peers
     end
 
-    return endpoint.host .. ":" .. endpoint.port
-end
+    local endpoint = filtered_peers[1]
+
+    if #filtered_peers > 1 then
+        local a, b = math.random(1, #filtered_peers), math.random(1, 
#filtered_peers - 1)
+        if b >= a then
+            b = b + 1
+        end
+
+        local backendpoint
+        endpoint, backendpoint = filtered_peers[a], filtered_peers[b]
+        if score(endpoint) > score(backendpoint) then
+            endpoint = backendpoint
+        end
+    end
 
+    return get_upstream_name(endpoint)
+end
 
 local function _ewma_after_balance(ctx, before_retry)
     if before_retry then
-        -- don't count tries which fail to complete
+        if not ctx.balancer_tried_servers then
+            ctx.balancer_tried_servers = 
core.tablepool.fetch("balancer_tried_servers", 0, 2)
+        end
+
+        ctx.balancer_tried_servers[ctx.balancer_server] = true
+        ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count 
or 0) + 1
+
         return nil
     end
 
+    if ctx.balancer_tried_servers then
+        core.tablepool.release("balancer_tried_servers", 
ctx.balancer_tried_servers)
+        ctx.balancer_tried_servers = nil
+    end
+
     local response_time = ctx.var.upstream_response_time or 0
     local connect_time = ctx.var.upstream_connect_time or 0
     local rtt = connect_time + response_time
@@ -163,21 +214,22 @@ local function _ewma_after_balance(ctx, before_retry)
     return get_or_update_ewma(upstream, rtt, true)
 end
 
-
 function _M.new(up_nodes, upstream)
-    if not shm_ewma
-       or not shm_last_touched_at then
+    if not shm_ewma or not shm_last_touched_at then
         return nil, "dictionary not find"
     end
 
+    if not ewma_lock then
+        error(ewma_lock_err)
+    end
+
     return {
         upstream = upstream,
-        get = function (ctx)
+        get = function(ctx)
             return _ewma_find(ctx, up_nodes)
         end,
         after_balance = _ewma_after_balance
     }
 end
 
-
 return _M
diff --git a/t/node/ewma.t b/t/node/ewma.t
index 955a6b7..53bf848 100644
--- a/t/node/ewma.t
+++ b/t/node/ewma.t
@@ -166,7 +166,7 @@ GET /t
                 return
             end
 
-            ngx.sleep(20)
+            ngx.sleep(11)
             --keep the node 1980 hot
             for i = 1, 12 do
                 local httpc = http.new()
@@ -215,3 +215,167 @@ GET /t
 --- error_code: 200
 --- no_error_log
 [error]
+
+
+
+=== TEST 4: about filter tried servers
+--- timeout: 10
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            --remove the 1981 node,
+            --add the 9527 node (invalid node)
+            --keep two nodes for triggering ewma logic in server_picker 
function of balancer phase
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1,
+                                "127.0.0.1:9527": 1
+                            },
+                            "type": "ewma",
+                            "timeout": {
+                                "connect": 0.1,
+                                "send": 0.5,
+                                "read": 0.5
+                            }
+                        },
+                        "uri": "/ewma"
+                }]]
+                )
+
+            if code ~= 200 then
+                ngx.say("update route failed")
+                return
+            end
+
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+                        .. "/ewma"
+
+            --should always select the 1980 node, because 0 is invalid
+            local t = {}
+            local ports_count = {}
+            for i = 1, 12 do
+                local th = assert(ngx.thread.spawn(function(i)
+                    local httpc = http.new()
+                    httpc:set_timeout(2000)
+                    local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+                    if not res then
+                        ngx.say(err)
+                        return
+                    end
+                    ports_count[res.body] = (ports_count[res.body] or 0) + 1
+                end, i))
+                table.insert(t, th)
+            end
+            for i, th in ipairs(t) do
+                ngx.thread.wait(th)
+            end
+
+            local ports_arr = {}
+            for port, count in pairs(ports_count) do
+                table.insert(ports_arr, {port = port, count = count})
+            end
+
+            local function cmd(a, b)
+                return a.port > b.port
+            end
+            table.sort(ports_arr, cmd)
+
+            ngx.say(require("toolkit.json").encode(ports_arr))
+            ngx.exit(200)
+        }
+    }
+--- request
+GET /t
+--- response_body
+[{"count":12,"port":"1980"}]
+--- error_code: 200
+--- error_log
+Connection refused) while connecting to upstream
+
+
+
+=== TEST 5: about all endpoints have been retried
+--- timeout: 10
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            --add the 9527 node (invalid node)
+            --add the 9528 node (invalid node)
+            --keep two nodes for triggering ewma logic in server_picker 
function of balancer phase
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:9527": 1,
+                                "127.0.0.1:9528": 1
+                            },
+                            "type": "ewma",
+                            "timeout": {
+                                "connect": 0.1,
+                                "send": 0.5,
+                                "read": 0.5
+                            }
+                        },
+                        "uri": "/ewma"
+                }]]
+                )
+
+            if code ~= 200 then
+                ngx.say("update route failed")
+                return
+            end
+
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+                        .. "/ewma"
+
+            --should always return 502, because both 9527 and 9528 are invalid
+            local t = {}
+            local ports_count = {}
+            for i = 1, 12 do
+                local th = assert(ngx.thread.spawn(function(i)
+                    local httpc = http.new()
+                    httpc:set_timeout(2000)
+                    local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+                    if not res then
+                        ngx.say(err)
+                        return
+                    end
+                    ports_count[res.status] = (ports_count[res.status] or 0) + 
1
+                end, i))
+                table.insert(t, th)
+            end
+            for i, th in ipairs(t) do
+                ngx.thread.wait(th)
+            end
+
+            local ports_arr = {}
+            for port, count in pairs(ports_count) do
+                table.insert(ports_arr, {port = port, count = count})
+            end
+
+            local function cmd(a, b)
+                return a.port > b.port
+            end
+            table.sort(ports_arr, cmd)
+
+            ngx.say(require("toolkit.json").encode(ports_arr))
+            ngx.exit(200)
+        }
+    }
+--- request
+GET /t
+--- response_body
+[{"count":12,"port":502}]
+--- error_code: 200
+--- error_log
+Connection refused) while connecting to upstream

Reply via email to