This is an automated email from the ASF dual-hosted git repository.
monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 0bf719ee2 fix: memory leak caused by timer that never quit (#10614)
0bf719ee2 is described below
commit 0bf719ee21155bc35c7bb4ec80081d57f6c0ab7f
Author: Liu Wei <[email protected]>
AuthorDate: Mon Dec 11 13:50:52 2023 +0800
fix: memory leak caused by timer that never quit (#10614)
---
apisix/core/config_etcd.lua | 161 +++++++++++++++++++++++---------------------
1 file changed, 86 insertions(+), 75 deletions(-)
diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua
index 357f24fa1..f0c3cde1f 100644
--- a/apisix/core/config_etcd.lua
+++ b/apisix/core/config_etcd.lua
@@ -168,119 +168,130 @@ local function do_run_watch(premature)
watch_ctx.rev = rev + 1
watch_ctx.started = true
- log.warn("main etcd watcher started, revision=", watch_ctx.rev)
- for _, sema in pairs(watch_ctx.wait_init) do
- sema:post()
+ log.info("main etcd watcher started, revision=", watch_ctx.rev)
+
+ if watch_ctx.wait_init then
+ for _, sema in pairs(watch_ctx.wait_init) do
+ sema:post()
+ end
+ watch_ctx.wait_init = nil
end
- watch_ctx.wait_init = nil
local opts = {}
opts.timeout = 50 -- second
opts.need_cancel = true
+ opts.start_revision = watch_ctx.rev
+
+ log.info("restart watchdir: start_revision=", opts.start_revision)
+
+ local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix,
opts)
+ if not res_func then
+ log.error("watchdir err: ", err)
+ ngx_sleep(3)
+ return
+ end
- ::restart_watch::
+ ::watch_event::
while true do
- opts.start_revision = watch_ctx.rev
- log.info("restart watchdir: start_revision=", opts.start_revision)
- local res_func, err, http_cli =
watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
- if not res_func then
- log.error("watchdir: ", err)
- ngx_sleep(3)
- goto restart_watch
+ local res, err = res_func()
+ if log_level >= NGX_INFO then
+ log.info("res_func: ", inspect(res))
end
- ::watch_event::
- while true do
- local res, err = res_func()
- if log_level >= NGX_INFO then
- log.info("res_func: ", inspect(res))
+ if not res then
+ if err ~= "closed" and
+ err ~= "timeout" and
+ err ~= "broken pipe"
+ then
+ log.error("wait watch event: ", err)
end
+ cancel_watch(http_cli)
+ break
+ end
- if not res then
- if err ~= "closed" and
- err ~= "timeout" and
- err ~= "broken pipe"
- then
- log.error("wait watch event: ", err)
- end
- cancel_watch(http_cli)
- break
- end
+ if res.error then
+ log.error("wait watch event: ", inspect(res.error))
+ cancel_watch(http_cli)
+ break
+ end
- if res.error then
- log.error("wait watch event: ", inspect(res.error))
- cancel_watch(http_cli)
- break
- end
+ if res.result.created then
+ goto watch_event
+ end
- if res.result.created then
- goto watch_event
+ if res.result.canceled then
+ log.warn("watch canceled by etcd, res: ", inspect(res))
+ if res.result.compact_revision then
+ watch_ctx.rev = tonumber(res.result.compact_revision)
+ log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
+ produce_res(nil, "compacted")
end
+ cancel_watch(http_cli)
+ break
+ end
- if res.result.canceled then
- log.warn("watch canceled by etcd, res: ", inspect(res))
- if res.result.compact_revision then
- watch_ctx.rev = tonumber(res.result.compact_revision)
- log.warn("etcd compacted, compact_revision=",
watch_ctx.rev)
- produce_res(nil, "compacted")
- end
- cancel_watch(http_cli)
- break
+ -- cleanup
+ local min_idx = 0
+ for _, idx in pairs(watch_ctx.idx) do
+ if (min_idx == 0) or (idx < min_idx) then
+ min_idx = idx
end
+ end
- -- cleanup
- local min_idx = 0
- for _, idx in pairs(watch_ctx.idx) do
- if (min_idx == 0) or (idx < min_idx) then
- min_idx = idx
- end
- end
+ for i = 1, min_idx - 1 do
+ watch_ctx.res[i] = false
+ end
- for i = 1, min_idx - 1 do
- watch_ctx.res[i] = false
+ if min_idx > 100 then
+ for k, idx in pairs(watch_ctx.idx) do
+ watch_ctx.idx[k] = idx - min_idx + 1
end
-
- if min_idx > 100 then
- for k, idx in pairs(watch_ctx.idx) do
- watch_ctx.idx[k] = idx - min_idx + 1
- end
- -- trim the res table
- for i = 1, min_idx - 1 do
- table.remove(watch_ctx.res, 1)
- end
+ -- trim the res table
+ for i = 1, min_idx - 1 do
+ table.remove(watch_ctx.res, 1)
end
+ end
- local rev = tonumber(res.result.header.revision)
- if rev > watch_ctx.rev then
- watch_ctx.rev = rev + 1
- end
- produce_res(res)
+ local rev = tonumber(res.result.header.revision)
+ if rev > watch_ctx.rev then
+ watch_ctx.rev = rev + 1
end
+ produce_res(res)
end
end
local function run_watch(premature)
- local run_watch_th = ngx_thread_spawn(do_run_watch, premature)
+ local run_watch_th, err = ngx_thread_spawn(do_run_watch, premature)
+ if not run_watch_th then
+ log.error("failed to spawn thread do_run_watch: ", err)
+ return
+ end
- ::restart::
- local check_worker_th = ngx_thread_spawn(function ()
+ local check_worker_th, err = ngx_thread_spawn(function ()
while not exiting() do
ngx_sleep(0.1)
end
end)
+ if not check_worker_th then
+ log.error("failed to spawn thread check_worker: ", err)
+ return
+ end
- local ok, err = ngx_thread_wait(check_worker_th)
-
+ local ok, err = ngx_thread_wait(run_watch_th, check_worker_th)
if not ok then
log.error("check_worker thread terminates failed, retart checker,
error: " .. err)
- ngx_thread_kill(check_worker_th)
- goto restart
end
ngx_thread_kill(run_watch_th)
- -- notify child watchers
- produce_res(nil, "worker exited")
+ ngx_thread_kill(check_worker_th)
+
+ if not exiting() then
+ ngx_timer_at(0, run_watch)
+ else
+ -- notify child watchers
+ produce_res(nil, "worker exited")
+ end
end