This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new bbbdf58  refactor: the parent of upstream should point to its original 
src (#3287)
bbbdf58 is described below

commit bbbdf58d555ee89144b6923e751deabf7e0bbf15
Author: 罗泽轩 <spacewander...@gmail.com>
AuthorDate: Fri Jan 15 00:43:39 2021 -0600

    refactor: the parent of upstream should point to its original src (#3287)
    
    Signed-off-by: spacewander <spacewander...@gmail.com>
---
 apisix/core/table.lua                |  45 ++++++---
 apisix/http/service.lua              |   1 +
 apisix/plugin.lua                    |   7 +-
 apisix/plugins/example-plugin.lua    |   2 +-
 apisix/plugins/traffic-split.lua     |   4 +-
 apisix/router.lua                    |   1 +
 apisix/stream/plugins/mqtt-proxy.lua |   2 +-
 apisix/upstream.lua                  |  39 +++-----
 t/node/healthcheck.t                 | 120 +++++++++++++++++++++++-
 t/node/healthcheck2.t                | 175 ++++++++++++++++++++++++++++++++++-
 10 files changed, 342 insertions(+), 54 deletions(-)

diff --git a/apisix/core/table.lua b/apisix/core/table.lua
index e314b39..d0bc1b2 100644
--- a/apisix/core/table.lua
+++ b/apisix/core/table.lua
@@ -96,25 +96,44 @@ function _M.setmt__gc(t, mt)
 end
 
 
-local function deepcopy(orig)
-    local orig_type = type(orig)
-    if orig_type ~= 'table' then
-        return orig
-    end
+local deepcopy
+do
+    local function _deepcopy(orig, copied)
+        -- prevent infinite loop when a field refers its parent
+        copied[orig] = true
+        -- If the array-like table contains nil in the middle,
+        -- the len might be smaller than the expected.
+        -- But it doesn't affect the correctness.
+        local len = #orig
+        local copy = new_tab(len, nkeys(orig) - len)
+        for orig_key, orig_value in pairs(orig) do
+            if type(orig_value) == "table" and not copied[orig_value] then
+                copy[orig_key] = _deepcopy(orig_value, copied)
+            else
+                copy[orig_key] = orig_value
+            end
+        end
 
-    -- If the array-like table contains nil in the middle,
-    -- the len might be smaller than the expected.
-    -- But it doesn't affect the correctness.
-    local len = #orig
-    local copy = new_tab(len, nkeys(orig) - len)
-    for orig_key, orig_value in pairs(orig) do
-        copy[orig_key] = deepcopy(orig_value)
+        return copy
     end
 
-    return copy
+
+    local copied_recorder = {}
+
+    function deepcopy(orig)
+        local orig_type = type(orig)
+        if orig_type ~= 'table' then
+            return orig
+        end
+
+        local res = _deepcopy(orig, copied_recorder)
+        _M.clear(copied_recorder)
+        return res
+    end
 end
 _M.deepcopy = deepcopy
 
+
 local ngx_null = ngx.null
 local function merge(origin, extend)
     for k,v in pairs(extend) do
diff --git a/apisix/http/service.lua b/apisix/http/service.lua
index a23924d..9279dc3 100644
--- a/apisix/http/service.lua
+++ b/apisix/http/service.lua
@@ -79,6 +79,7 @@ local function filter(service)
         service.value.upstream.nodes = new_nodes
     end
 
+    service.value.upstream.parent = service
     core.log.info("filter service: ", core.json.delay_encode(service))
 end
 
diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index 9928cb5..5445cfc 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -401,11 +401,8 @@ local function merge_service_route(service_conf, 
route_conf)
     local route_upstream = route_conf.value.upstream
     if route_upstream then
         new_conf.value.upstream = route_upstream
-
-        if route_upstream.checks then
-            route_upstream.parent = route_conf
-        end
-
+        -- when route's upstream override service's upstream,
+        -- the upstream.parent still point to the route
         new_conf.value.upstream_id = nil
         new_conf.has_domain = route_conf.has_domain
     end
diff --git a/apisix/plugins/example-plugin.lua 
b/apisix/plugins/example-plugin.lua
index e270a5c..83722ff 100644
--- a/apisix/plugins/example-plugin.lua
+++ b/apisix/plugins/example-plugin.lua
@@ -99,7 +99,7 @@ function _M.access(conf, ctx)
 
     local matched_route = ctx.matched_route
     upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
-                 ctx.conf_version, up_conf, matched_route)
+                 ctx.conf_version, up_conf)
     return
 end
 
diff --git a/apisix/plugins/traffic-split.lua b/apisix/plugins/traffic-split.lua
index d0bdec7..a94f6dd 100644
--- a/apisix/plugins/traffic-split.lua
+++ b/apisix/plugins/traffic-split.lua
@@ -240,14 +240,16 @@ local function set_upstream(upstream_info, ctx)
 
     local ok, err = upstream.check_schema(up_conf)
     if not ok then
+        core.log.error("failed to validate generated upstream: ", err)
         return 500, err
     end
 
     local matched_route = ctx.matched_route
+    up_conf.parent = matched_route
     local upstream_key = up_conf.type .. "#route_" ..
                          matched_route.value.id .. "_" ..upstream_info.vid
     core.log.info("upstream_key: ", upstream_key)
-    upstream.set(ctx, upstream_key, ctx.conf_version, up_conf, matched_route)
+    upstream.set(ctx, upstream_key, ctx.conf_version, up_conf)
 
     return
 end
diff --git a/apisix/router.lua b/apisix/router.lua
index 0ae2481..0f824e5 100644
--- a/apisix/router.lua
+++ b/apisix/router.lua
@@ -64,6 +64,7 @@ local function filter(route)
         route.value.upstream.nodes = new_nodes
     end
 
+    route.value.upstream.parent = route
     core.log.info("filter route: ", core.json.delay_encode(route))
 end
 
diff --git a/apisix/stream/plugins/mqtt-proxy.lua 
b/apisix/stream/plugins/mqtt-proxy.lua
index b533430..f7cfd88 100644
--- a/apisix/stream/plugins/mqtt-proxy.lua
+++ b/apisix/stream/plugins/mqtt-proxy.lua
@@ -173,7 +173,7 @@ function _M.preread(conf, ctx)
 
     local matched_route = ctx.matched_route
     upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
-                 ctx.conf_version, up_conf, matched_route)
+                 ctx.conf_version, up_conf)
     return
 end
 
diff --git a/apisix/upstream.lua b/apisix/upstream.lua
index 096f81e..856f44d 100644
--- a/apisix/upstream.lua
+++ b/apisix/upstream.lua
@@ -33,7 +33,7 @@ local lrucache_checker = core.lrucache.new({
 local _M = {}
 
 
-local function set_directly(ctx, key, ver, conf, parent)
+local function set_directly(ctx, key, ver, conf)
     if not ctx then
         error("missing argument ctx", 2)
     end
@@ -46,24 +46,22 @@ local function set_directly(ctx, key, ver, conf, parent)
     if not conf then
         error("missing argument conf", 2)
     end
-    if not parent then
-        error("missing argument parent", 2)
-    end
 
     ctx.upstream_conf = conf
     ctx.upstream_version = ver
     ctx.upstream_key = key
-    ctx.upstream_healthcheck_parent = parent
+    ctx.upstream_healthcheck_parent = conf.parent
     return
 end
 _M.set = set_directly
 
 
-local function create_checker(upstream, healthcheck_parent)
+local function create_checker(upstream)
     if healthcheck == nil then
         healthcheck = require("resty.healthcheck")
     end
 
+    local healthcheck_parent = upstream.parent
     local checker, err = healthcheck.new({
         name = "upstream#" .. healthcheck_parent.key,
         shm_name = "upstream-healthcheck",
@@ -85,27 +83,18 @@ local function create_checker(upstream, healthcheck_parent)
         end
     end
 
-    if upstream.parent then
-        core.table.insert(upstream.parent.clean_handlers, function ()
-            core.log.info("try to release checker: ", tostring(checker))
-            checker:clear()
-            checker:stop()
-        end)
-
-    else
-        core.table.insert(healthcheck_parent.clean_handlers, function ()
-            core.log.info("try to release checker: ", tostring(checker))
-            checker:clear()
-            checker:stop()
-        end)
-    end
+    core.table.insert(healthcheck_parent.clean_handlers, function ()
+        core.log.info("try to release checker: ", tostring(checker))
+        checker:clear()
+        checker:stop()
+    end)
 
     core.log.info("create new checker: ", tostring(checker))
     return checker
 end
 
 
-local function fetch_healthchecker(upstream, healthcheck_parent, version)
+local function fetch_healthchecker(upstream, version)
     if not upstream.checks then
         return
     end
@@ -115,8 +104,7 @@ local function fetch_healthchecker(upstream, 
healthcheck_parent, version)
     end
 
     local checker = lrucache_checker(upstream, version,
-                                     create_checker, upstream,
-                                     healthcheck_parent)
+                                     create_checker, upstream)
     return checker
 end
 
@@ -150,7 +138,7 @@ function _M.set_by_route(route, api_ctx)
     end
 
     set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
-                 api_ctx.conf_version, up_conf, route)
+                 api_ctx.conf_version, up_conf)
 
     local nodes_count = up_conf.nodes and #up_conf.nodes or 0
     if nodes_count == 0 then
@@ -158,7 +146,7 @@ function _M.set_by_route(route, api_ctx)
     end
 
     if nodes_count > 1 then
-        local checker = fetch_healthchecker(up_conf, route, 
api_ctx.upstream_version)
+        local checker = fetch_healthchecker(up_conf, api_ctx.upstream_version)
         api_ctx.up_checker = checker
     end
 
@@ -219,6 +207,7 @@ function _M.init_worker()
                     upstream.value.nodes = new_nodes
                 end
 
+                upstream.value.parent = upstream
                 core.log.info("filter upstream: ", 
core.json.delay_encode(upstream))
             end,
         })
diff --git a/t/node/healthcheck.t b/t/node/healthcheck.t
index 1056a53..d9bacb8 100644
--- a/t/node/healthcheck.t
+++ b/t/node/healthcheck.t
@@ -218,10 +218,10 @@ GET /t
 --- response_body
 [{"count":12,"port":"1980"}]
 --- grep_error_log eval
-qr/unhealthy .* for '.*'/
+qr/\([^)]+\) unhealthy .* for '.*'/
 --- grep_error_log_out
-unhealthy TCP increment (1/2) for 'foo.com(127.0.0.1:1970)'
-unhealthy TCP increment (2/2) for 'foo.com(127.0.0.1:1970)'
+(upstream#/apisix/routes/1) unhealthy TCP increment (1/2) for 
'foo.com(127.0.0.1:1970)'
+(upstream#/apisix/routes/1) unhealthy TCP increment (2/2) for 
'foo.com(127.0.0.1:1970)'
 --- timeout: 10
 
 
@@ -795,3 +795,117 @@ GET /t
 qr/expected 65536 to be smaller than 65535/
 --- error_code chomp
 400
+
+
+
+=== TEST 18: set route + upstream (two upstream node: one healthy + one 
unhealthy)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/upstreams/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "type": "roundrobin",
+                    "nodes": {
+                        "127.0.0.1:1980": 1,
+                        "127.0.0.1:1970": 1
+                    },
+                    "checks": {
+                        "active": {
+                            "http_path": "/status",
+                            "host": "foo.com",
+                            "healthy": {
+                                "interval": 1,
+                                "successes": 1
+                            },
+                            "unhealthy": {
+                                "interval": 1,
+                                "http_failures": 2
+                            }
+                        }
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/server_port",
+                    "upstream_id": 1
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- grep_error_log eval
+qr/^.*?\[error\](?!.*process exiting).*/
+--- grep_error_log_out
+
+
+
+=== TEST 19: hit routes, ensure the checker is bound to the upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+                        .. "/server_port"
+
+            do
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+            end
+
+            ngx.sleep(2.5)
+
+            local ports_count = {}
+            for i = 1, 12 do
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+                if not res then
+                    ngx.say(err)
+                    return
+                end
+
+                ports_count[res.body] = (ports_count[res.body] or 0) + 1
+            end
+
+            local ports_arr = {}
+            for port, count in pairs(ports_count) do
+                table.insert(ports_arr, {port = port, count = count})
+            end
+
+            local function cmd(a, b)
+                return a.port > b.port
+            end
+            table.sort(ports_arr, cmd)
+
+            ngx.say(require("toolkit.json").encode(ports_arr))
+            ngx.exit(200)
+        }
+    }
+--- request
+GET /t
+--- response_body
+[{"count":12,"port":"1980"}]
+--- grep_error_log eval
+qr/\([^)]+\) unhealthy .* for '.*'/
+--- grep_error_log_out
+(upstream#/apisix/upstreams/1) unhealthy TCP increment (1/2) for 
'foo.com(127.0.0.1:1970)'
+(upstream#/apisix/upstreams/1) unhealthy TCP increment (2/2) for 
'foo.com(127.0.0.1:1970)'
+--- timeout: 10
diff --git a/t/node/healthcheck2.t b/t/node/healthcheck2.t
index 1b25cc8..1c802e2 100644
--- a/t/node/healthcheck2.t
+++ b/t/node/healthcheck2.t
@@ -19,8 +19,10 @@ use t::APISIX 'no_plan';
 
 master_on();
 repeat_each(1);
+log_level('info');
 no_root_location();
 no_shuffle();
+worker_connections(256);
 
 add_block_preprocessor(sub {
     my ($block) = @_;
@@ -37,11 +39,7 @@ _EOC_
     }
 
     if (!$block->request) {
-        $block->set_value("request", "GET /hello");
-    }
-
-    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
-        $block->set_value("no_error_log", "[error]");
+        $block->set_value("request", "GET /t");
     }
 });
 
@@ -66,4 +64,171 @@ upstreams:
 #END
 --- error_log
 value should match only one schema, but matches both schemas 1 and 2
+--- request
+GET /hello
 --- error_code: 502
+
+
+
+=== TEST 2: route + service
+--- apisix_yaml
+services:
+    - id: 1
+      upstream:
+          type: roundrobin
+          nodes:
+              "127.0.0.1:1980": 1
+              "127.0.0.1:1970": 1
+          checks:
+              active:
+                  http_path: /status
+                  host: foo.com
+                  healthy:
+                      interval: 1
+                      successes: 1
+                  unhealthy:
+                      interval: 1
+                      http_failures: 2
+routes:
+    -
+    service_id: 1
+    uri: /server_port
+#END
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+                        .. "/server_port"
+
+            do
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+            end
+
+            ngx.sleep(2.5)
+
+            local ports_count = {}
+            for i = 1, 12 do
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+                if not res then
+                    ngx.say(err)
+                    return
+                end
+
+                ports_count[res.body] = (ports_count[res.body] or 0) + 1
+            end
+
+            local ports_arr = {}
+            for port, count in pairs(ports_count) do
+                table.insert(ports_arr, {port = port, count = count})
+            end
+
+            local function cmd(a, b)
+                return a.port > b.port
+            end
+            table.sort(ports_arr, cmd)
+
+            ngx.say(require("toolkit.json").encode(ports_arr))
+            ngx.exit(200)
+        }
+    }
+--- response_body
+[{"count":12,"port":"1980"}]
+--- grep_error_log eval
+qr/\([^)]+\) unhealthy .* for '.*'/
+--- grep_error_log_out
+(upstream#/services/1) unhealthy TCP increment (1/2) for 
'foo.com(127.0.0.1:1970)'
+(upstream#/services/1) unhealthy TCP increment (2/2) for 
'foo.com(127.0.0.1:1970)'
+--- timeout: 10
+
+
+
+=== TEST 3: route override service
+--- apisix_yaml
+services:
+    - id: 1
+      upstream:
+          type: roundrobin
+          nodes:
+              "127.0.0.2:1980": 1
+              "127.0.0.2:1970": 1
+          checks:
+              active:
+                  http_path: /status
+                  host: foo.com
+                  healthy:
+                      interval: 1
+                      successes: 1
+                  unhealthy:
+                      interval: 1
+                      http_failures: 2
+routes:
+    -
+    service_id: 1
+    uri: /server_port
+    upstream:
+        type: roundrobin
+        nodes:
+            "127.0.0.1:1980": 1
+            "127.0.0.1:1970": 1
+        checks:
+            active:
+                http_path: /status
+                host: foo.com
+                healthy:
+                    interval: 1
+                    successes: 1
+                unhealthy:
+                    interval: 1
+                    http_failures: 2
+#END
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port
+                        .. "/server_port"
+
+            do
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+            end
+
+            ngx.sleep(2.5)
+
+            local ports_count = {}
+            for i = 1, 12 do
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri, {method = "GET", 
keepalive = false})
+                if not res then
+                    ngx.say(err)
+                    return
+                end
+
+                ports_count[res.body] = (ports_count[res.body] or 0) + 1
+            end
+
+            local ports_arr = {}
+            for port, count in pairs(ports_count) do
+                table.insert(ports_arr, {port = port, count = count})
+            end
+
+            local function cmd(a, b)
+                return a.port > b.port
+            end
+            table.sort(ports_arr, cmd)
+
+            ngx.say(require("toolkit.json").encode(ports_arr))
+            ngx.exit(200)
+        }
+    }
+--- response_body
+[{"count":12,"port":"1980"}]
+--- grep_error_log eval
+qr/\([^)]+\) unhealthy .* for '.*'/
+--- grep_error_log_out
+(upstream#/routes/arr_1) unhealthy TCP increment (1/2) for 
'foo.com(127.0.0.1:1970)'
+(upstream#/routes/arr_1) unhealthy TCP increment (2/2) for 
'foo.com(127.0.0.1:1970)'
+--- timeout: 10

Reply via email to