This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new 231055595  Prevent delayed opener error from crashing index servers
231055595 is described below

commit 23105559500941e6463d8d5808e90231dc7365b2
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Oct 17 01:25:57 2023 -0400

     Prevent delayed opener error from crashing index servers
    
    Previously, an index and opener process dying could have resulted in the 
index
    gen_server crashing. This was observed in the CI test as described in:
    https://github.com/apache/couchdb/issues/4809
    
    The process in more detail was as follows:
    
      * When an async opener result is handled in the index server, there is a
        period of time when the index server is linked to both the index and the
        opener process.
    
      * After we reply early to the waiting clients, a client may do something 
to
        cause the indexer to crash, which would crash the opener along with it.
        That would generate two `{'EXIT', Pid, _}` messages queued in the 
indexer
        process' mailbox.
    
      * The index gen_server, is still processing the async opener result 
callback,
        where it would remove the openener from the `openers` table, then it
        returns `ok` to the async opener.
    
      * Index gen_server continues processing queued `EXIT` messages in 
`handle_info`:
         - The one for the indexer Pid is handled appropriately
         - The one for the opener leads to an eexit(...)` clause since we ended
           with an unknown process exiting.
    
    To avoid the race condition, and the extra opener `EXIT` message, unlink and
    reply early to the opener, as soon we linked to the indexer or had received 
the
    error. To avoid the small chance of still getting an `EXIT` message from the
    opener, in case it crashed right before we unlinked, flush any exit messages
    from it. We do a similar flushing in two other places so create small 
utility
    function to avoid duplicating the code too much.
    
    Fix: https://github.com/apache/couchdb/issues/4809
---
 src/couch_index/src/couch_index_server.erl | 48 +++++++++++++++++-------------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git a/src/couch_index/src/couch_index_server.erl 
b/src/couch_index/src/couch_index_server.erl
index 35df43d2a..807f87a88 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -167,19 +167,29 @@ handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = 
Args}, From, State) ->
         [{_, Pid}] when is_pid(Pid) ->
             {reply, {ok, Pid}, State}
     end;
-handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _}, 
State) ->
-    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
-    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _} = 
FromOpener, State) ->
     link(Pid),
+    % Once linked with the indexer, dismiss and ignore the opener.
+    unlink(OpenerPid),
     ets:delete(State#st.openers, OpenerPid),
-    add_to_ets(DbName, Sig, DDocId, Pid, State),
-    {reply, ok, State};
-handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _}, 
State) ->
+    gen_server:reply(FromOpener, ok),
     [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
-    [gen_server:reply(From, Error) || From <- Waiters],
+    add_to_ets(DbName, Sig, DDocId, Pid, State),
+    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
+    % Flush opener exit messages in case it died before we unlinked it
+    ok = flush_exit_messages_from(OpenerPid),
+    {noreply, State};
+handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _} = 
FromOpener, State) ->
+    % Once opener reported the error, we can dismiss the opener
+    unlink(OpenerPid),
     ets:delete(State#st.openers, OpenerPid),
+    gen_server:reply(FromOpener, ok),
+    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
     ets:delete(State#st.by_sig, {DbName, Sig}),
-    {reply, ok, State};
+    [gen_server:reply(From, Error) || From <- Waiters],
+    % Flush opener exit messages in case it died before we unlinked it
+    ok = flush_exit_messages_from(OpenerPid),
+    {noreply, State};
 handle_call({reset_indexes, DbName}, _From, State) ->
     reset_indexes(DbName, State),
     {reply, ok, State}.
@@ -283,12 +293,7 @@ reset_indexes(DbName, #st{} = State) ->
         [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
         unlink(Pid),
         gen_server:cast(Pid, delete),
-        receive
-            {'EXIT', Pid, _} ->
-                ok
-        after 0 ->
-            ok
-        end,
+        ok = flush_exit_messages_from(Pid),
         rem_from_ets(DbName, Sig, DDocIds, Pid, State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)),
@@ -327,12 +332,7 @@ rem_from_ets(DbName, #st{} = State) ->
     Fun = fun({Sig, DDocIds}) ->
         [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
         unlink(Pid),
-        receive
-            {'EXIT', Pid, _} ->
-                ok
-        after 0 ->
-            ok
-        end,
+        ok = flush_exit_messages_from(Pid),
         rem_from_ets(DbName, Sig, DDocIds, Pid, State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)).
@@ -442,3 +442,11 @@ aggregate_queue_len() ->
      || Name <- Names
     ],
     lists:sum([X || {_, X} <- MQs]).
+
+flush_exit_messages_from(Pid) ->
+    receive
+        {'EXIT', Pid, _} ->
+            ok
+    after 0 ->
+        ok
+    end.

Reply via email to