This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch fix-index-server-premature-return
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit fb67e9e3d883c5e9458f9a64a1e4f680f9479d52
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Oct 17 01:25:57 2023 -0400

    Fix index server crash when async opener returns too early
    
    Previously, the index process and the still linked async opener processes 
could have
    crashed, after we already replied the the waiting clients. That would 
genarate
    two `EXIT` messages: one for the indexer pid, and one for the opener. By the
    time the opener `EXIT` was handled, the `st.openers` entry was already 
missing
    (since handle_call of async_open could delete it), and so the index server
    itself would crash.
---
 src/couch_index/src/couch_index_server.erl | 48 +++++++++++++++++-------------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git a/src/couch_index/src/couch_index_server.erl 
b/src/couch_index/src/couch_index_server.erl
index 35df43d2a..807f87a88 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -167,19 +167,29 @@ handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = 
Args}, From, State) ->
         [{_, Pid}] when is_pid(Pid) ->
             {reply, {ok, Pid}, State}
     end;
-handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _}, 
State) ->
-    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
-    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _} = 
FromOpener, State) ->
     link(Pid),
+    % Once linked with the indexer, dismiss and ignore the opener.
+    unlink(OpenerPid),
     ets:delete(State#st.openers, OpenerPid),
-    add_to_ets(DbName, Sig, DDocId, Pid, State),
-    {reply, ok, State};
-handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _}, 
State) ->
+    gen_server:reply(FromOpener, ok),
     [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
-    [gen_server:reply(From, Error) || From <- Waiters],
+    add_to_ets(DbName, Sig, DDocId, Pid, State),
+    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+    % Flush opener exit messages in case it died before we unlinked it
+    ok = flush_exit_messages_from(OpenerPid),
+    {noreply, State};
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _} = 
FromOpener, State) ->
+    % Once opener reported the error, we can dismiss the opener
+    unlink(OpenerPid),
     ets:delete(State#st.openers, OpenerPid),
+    gen_server:reply(FromOpener, ok),
+    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
     ets:delete(State#st.by_sig, {DbName, Sig}),
-    {reply, ok, State};
+    [gen_server:reply(From, Error) || From <- Waiters],
+    % Flush opener exit messages in case it died before we unlinked it
+    ok = flush_exit_messages_from(OpenerPid),
+    {noreply, State};
 handle_call({reset_indexes, DbName}, _From, State) ->
     reset_indexes(DbName, State),
     {reply, ok, State}.
@@ -283,12 +293,7 @@ reset_indexes(DbName, #st{} = State) ->
         [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
         unlink(Pid),
         gen_server:cast(Pid, delete),
-        receive
-            {'EXIT', Pid, _} ->
-                ok
-        after 0 ->
-            ok
-        end,
+        ok = flush_exit_messages_from(Pid),
         rem_from_ets(DbName, Sig, DDocIds, Pid, State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)),
@@ -327,12 +332,7 @@ rem_from_ets(DbName, #st{} = State) ->
     Fun = fun({Sig, DDocIds}) ->
         [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
         unlink(Pid),
-        receive
-            {'EXIT', Pid, _} ->
-                ok
-        after 0 ->
-            ok
-        end,
+        ok = flush_exit_messages_from(Pid),
         rem_from_ets(DbName, Sig, DDocIds, Pid, State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)).
@@ -442,3 +442,11 @@ aggregate_queue_len() ->
      || Name <- Names
     ],
     lists:sum([X || {_, X} <- MQs]).
+
+flush_exit_messages_from(Pid) ->
+    receive
+        {'EXIT', Pid, _} ->
+            ok
+    after 0 ->
+        ok
+    end.

Reply via email to