This is an automated email from the ASF dual-hosted git repository.

monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bf719ee2 fix: memory leak caused by timer that never quit (#10614)
0bf719ee2 is described below

commit 0bf719ee21155bc35c7bb4ec80081d57f6c0ab7f
Author: Liu Wei <[email protected]>
AuthorDate: Mon Dec 11 13:50:52 2023 +0800

    fix: memory leak caused by timer that never quit (#10614)
---
 apisix/core/config_etcd.lua | 161 +++++++++++++++++++++++---------------------
 1 file changed, 86 insertions(+), 75 deletions(-)

diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua
index 357f24fa1..f0c3cde1f 100644
--- a/apisix/core/config_etcd.lua
+++ b/apisix/core/config_etcd.lua
@@ -168,119 +168,130 @@ local function do_run_watch(premature)
     watch_ctx.rev = rev + 1
     watch_ctx.started = true
 
-    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
-    for _, sema in pairs(watch_ctx.wait_init) do
-        sema:post()
+    log.info("main etcd watcher started, revision=", watch_ctx.rev)
+
+    if watch_ctx.wait_init then
+        for _, sema in pairs(watch_ctx.wait_init) do
+            sema:post()
+        end
+        watch_ctx.wait_init = nil
     end
-    watch_ctx.wait_init = nil
 
     local opts = {}
     opts.timeout = 50 -- second
     opts.need_cancel = true
+    opts.start_revision = watch_ctx.rev
+
+    log.info("restart watchdir: start_revision=", opts.start_revision)
+
+    local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, 
opts)
+    if not res_func then
+        log.error("watchdir err: ", err)
+        ngx_sleep(3)
+        return
+    end
 
-    ::restart_watch::
+    ::watch_event::
     while true do
-        opts.start_revision = watch_ctx.rev
-        log.info("restart watchdir: start_revision=", opts.start_revision)
-        local res_func, err, http_cli = 
watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
-        if not res_func then
-            log.error("watchdir: ", err)
-            ngx_sleep(3)
-            goto restart_watch
+        local res, err = res_func()
+        if log_level >= NGX_INFO then
+            log.info("res_func: ", inspect(res))
         end
 
-        ::watch_event::
-        while true do
-            local res, err = res_func()
-            if log_level >= NGX_INFO then
-                log.info("res_func: ", inspect(res))
+        if not res then
+            if err ~= "closed" and
+                err ~= "timeout" and
+                err ~= "broken pipe"
+            then
+                log.error("wait watch event: ", err)
             end
+            cancel_watch(http_cli)
+            break
+        end
 
-            if not res then
-                if err ~= "closed" and
-                    err ~= "timeout" and
-                    err ~= "broken pipe"
-                then
-                    log.error("wait watch event: ", err)
-                end
-                cancel_watch(http_cli)
-                break
-            end
+        if res.error then
+            log.error("wait watch event: ", inspect(res.error))
+            cancel_watch(http_cli)
+            break
+        end
 
-            if res.error then
-                log.error("wait watch event: ", inspect(res.error))
-                cancel_watch(http_cli)
-                break
-            end
+        if res.result.created then
+            goto watch_event
+        end
 
-            if res.result.created then
-                goto watch_event
+        if res.result.canceled then
+            log.warn("watch canceled by etcd, res: ", inspect(res))
+            if res.result.compact_revision then
+                watch_ctx.rev = tonumber(res.result.compact_revision)
+                log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
+                produce_res(nil, "compacted")
             end
+            cancel_watch(http_cli)
+            break
+        end
 
-            if res.result.canceled then
-                log.warn("watch canceled by etcd, res: ", inspect(res))
-                if res.result.compact_revision then
-                    watch_ctx.rev = tonumber(res.result.compact_revision)
-                    log.warn("etcd compacted, compact_revision=", 
watch_ctx.rev)
-                    produce_res(nil, "compacted")
-                end
-                cancel_watch(http_cli)
-                break
+        -- cleanup
+        local min_idx = 0
+        for _, idx in pairs(watch_ctx.idx) do
+            if (min_idx == 0) or (idx < min_idx) then
+                min_idx = idx
             end
+        end
 
-            -- cleanup
-            local min_idx = 0
-            for _, idx in pairs(watch_ctx.idx) do
-                if (min_idx == 0) or (idx < min_idx) then
-                    min_idx = idx
-                end
-            end
+        for i = 1, min_idx - 1 do
+            watch_ctx.res[i] = false
+        end
 
-            for i = 1, min_idx - 1 do
-                watch_ctx.res[i] = false
+        if min_idx > 100 then
+            for k, idx in pairs(watch_ctx.idx) do
+                watch_ctx.idx[k] = idx - min_idx + 1
             end
-
-            if min_idx > 100 then
-                for k, idx in pairs(watch_ctx.idx) do
-                    watch_ctx.idx[k] = idx - min_idx + 1
-                end
-                -- trim the res table
-                for i = 1, min_idx - 1 do
-                    table.remove(watch_ctx.res, 1)
-                end
+            -- trim the res table
+            for i = 1, min_idx - 1 do
+                table.remove(watch_ctx.res, 1)
             end
+        end
 
-            local rev = tonumber(res.result.header.revision)
-            if rev > watch_ctx.rev then
-                watch_ctx.rev = rev + 1
-            end
-            produce_res(res)
+        local rev = tonumber(res.result.header.revision)
+        if rev > watch_ctx.rev then
+            watch_ctx.rev = rev + 1
         end
+        produce_res(res)
     end
 end
 
 
 local function run_watch(premature)
-    local run_watch_th = ngx_thread_spawn(do_run_watch, premature)
+    local run_watch_th, err = ngx_thread_spawn(do_run_watch, premature)
+    if not run_watch_th then
+        log.error("failed to spawn thread do_run_watch: ", err)
+        return
+    end
 
-    ::restart::
-    local check_worker_th = ngx_thread_spawn(function ()
+    local check_worker_th, err = ngx_thread_spawn(function ()
         while not exiting() do
             ngx_sleep(0.1)
         end
     end)
+    if not check_worker_th then
+        log.error("failed to spawn thread check_worker: ", err)
+        return
+    end
 
-    local ok, err = ngx_thread_wait(check_worker_th)
-
+    local ok, err = ngx_thread_wait(run_watch_th, check_worker_th)
     if not ok then
         log.error("check_worker thread terminates failed, retart checker, 
error: " .. err)
-        ngx_thread_kill(check_worker_th)
-        goto restart
     end
 
     ngx_thread_kill(run_watch_th)
-    -- notify child watchers
-    produce_res(nil, "worker exited")
+    ngx_thread_kill(check_worker_th)
+
+    if not exiting() then
+        ngx_timer_at(0, run_watch)
+    else
+        -- notify child watchers
+        produce_res(nil, "worker exited")
+    end
 end
 
 

Reply via email to