Repository: couchdb-mem3
Updated Branches:
  refs/heads/COUCHDB-3287-pluggable-storage-engines 24442fb5c -> e2fa78fc6 
(forced update)


Fix stale shards cache

There's a race condition in mem3_shards that can result in having shards
in the cache for a database that's been deleted. This results in a
confused cluster that thinks a database exists until you attempt to open
it.

The fix is to ignore any cache insert requests that come from an older
version of the dbs db than mem3_shards cache knows about.

Big thanks to @jdoane for the identification and original patch.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e2fa78fc
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e2fa78fc
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e2fa78fc

Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines
Commit: e2fa78fc6a9d003d36ebfe648cba26c4e0e56765
Parents: 0c9e63e
Author: Paul J. Davis <[email protected]>
Authored: Fri Feb 24 12:55:37 2017 -0600
Committer: Paul J. Davis <[email protected]>
Committed: Wed Mar 22 16:31:00 2017 -0500

----------------------------------------------------------------------
 src/mem3_shards.erl | 54 ++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 43 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e2fa78fc/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 11131a1..fb44218 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -27,7 +27,8 @@
 -record(st, {
     max_size = 25000,
     cur_size = 0,
-    changes_pid
+    changes_pid,
+    update_seq
 }).
 
 -include_lib("mem3/include/mem3.hrl").
@@ -180,11 +181,12 @@ init([]) ->
     ets:new(?ATIMES, [ordered_set, protected, named_table]),
     ok = config:listen_for_changes(?MODULE, nil),
     SizeList = config:get("mem3", "shard_cache_size", "25000"),
-    {Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) 
end),
+    UpdateSeq = get_update_seq(),
     {ok, #st{
         max_size = list_to_integer(SizeList),
         cur_size = 0,
-        changes_pid = Pid
+        changes_pid = start_changes_listener(UpdateSeq),
+        update_seq = UpdateSeq
     }}.
 
 handle_call({set_max_size, Size}, _From, St) ->
@@ -199,12 +201,23 @@ handle_cast({cache_hit, DbName}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, hit]),
     cache_hit(DbName),
     {noreply, St};
-handle_cast({cache_insert, DbName, Shards}, St) ->
+handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, miss]),
-    {noreply, cache_free(cache_insert(St, DbName, Shards))};
+    NewSt = case UpdateSeq < St#st.update_seq of
+        true -> St;
+        false -> cache_free(cache_insert(St, DbName, Shards))
+    end,
+    {noreply, NewSt};
 handle_cast({cache_remove, DbName}, St) ->
     couch_stats:increment_counter([mem3, shard_cache, eviction]),
     {noreply, cache_remove(St, DbName)};
+handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) ->
+    Msg = {cache_insert, DbName, Shards, UpdateSeq},
+    {noreply, NewSt} = handle_cast(Msg, St),
+    {noreply, NewSt#st{update_seq = UpdateSeq}};
+handle_cast({cache_remove_change, DbName, UpdateSeq}, St) ->
+    {noreply, NewSt} = handle_cast({cache_remove, DbName}, St),
+    {noreply, NewSt#st{update_seq = UpdateSeq}};
 handle_cast(_Msg, St) ->
     {noreply, St}.
 
@@ -221,8 +234,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, 
#st{changes_pid=Pid}=St) ->
     erlang:send_after(5000, self(), {start_listener, Seq}),
     {noreply, NewSt#st{changes_pid=undefined}};
 handle_info({start_listener, Seq}, St) ->
-    {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
-    {noreply, St#st{changes_pid=NewPid}};
+    {noreply, St#st{
+        changes_pid = start_changes_listener(Seq)
+    }};
 handle_info(restart_config_listener, State) ->
     ok = config:listen_for_changes(?MODULE, nil),
     {noreply, State};
@@ -238,6 +252,21 @@ code_change(_OldVsn, #st{}=St, _Extra) ->
 
 %% internal functions
 
+start_changes_listener(SinceSeq) ->
+    Self = self(),
+    {Pid, _} = erlang:spawn_monitor(fun() ->
+        erlang:spawn_link(fun() ->
+            Ref = erlang:monitor(process, Self),
+            receive
+                {'DOWN', Ref, _, _, _} ->
+                    ok
+            end,
+            exit(shutdown)
+        end),
+        listen_for_changes(SinceSeq)
+    end),
+    Pid.
+
 fold_fun(#full_doc_info{}=FDI, Acc) ->
     DI = couch_doc:to_doc_info(FDI),
     fold_fun(DI, Acc);
@@ -277,10 +306,11 @@ changes_callback({stop, EndSeq}, _) ->
     exit({seq, EndSeq});
 changes_callback({change, {Change}, _}, _) ->
     DbName = couch_util:get_value(<<"id">>, Change),
+    Seq = couch_util:get_value(<<"seq">>, Change),
     case DbName of <<"_design/", _/binary>> -> ok; _Else ->
         case mem3_util:is_deleted(Change) of
         true ->
-            gen_server:cast(?MODULE, {cache_remove, DbName});
+            gen_server:cast(?MODULE, {cache_remove_change, DbName, Seq});
         false ->
             case couch_util:get_value(doc, Change) of
             {error, Reason} ->
@@ -288,13 +318,14 @@ changes_callback({change, {Change}, _}, _) ->
                     [DbName, Reason]);
             {Doc} ->
                 Shards = mem3_util:build_ordered_shards(DbName, Doc),
-                gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+                Msg = {cache_insert_change, DbName, Shards, Seq},
+                gen_server:cast(?MODULE, Msg),
                 [create_if_missing(mem3:name(S), mem3:engine(S)) || S
                     <- Shards, mem3:node(S) =:= node()]
             end
         end
     end,
-    {ok, couch_util:get_value(<<"seq">>, Change)};
+    {ok, Seq};
 changes_callback(timeout, _) ->
     ok.
 
@@ -310,8 +341,9 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
 load_shards_from_db(ShardDb, DbName) ->
     case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
     {ok, #doc{body = {Props}}} ->
+        Seq = couch_db:get_update_seq(ShardDb),
         Shards = mem3_util:build_ordered_shards(DbName, Props),
-        gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+        gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}),
         Shards;
     {not_found, _} ->
         erlang:error(database_does_not_exist, ?b2l(DbName))

Reply via email to