This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch handle-purge-seq-and-checkpoint-oddness
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit de52fa78806e98bc622cf19ea6356e5255c6bdb0
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Mar 12 17:24:15 2026 -0400

    Handle cases when dreyfus checkpoint is out-of-sync with the index
    
    Currently, there are two places where the index purge seq is tracked: in the
    index and in the db local doc checkpoints. Purge sequence folding should 
never
    start below the value in the checkpoint document as that could raise an
    `invalid_start_purge_seq`. Normally both sequences should match, but if they
    don't try to be explicit about what should happen:
    
     * Index pseq > checkpoint pseq. Index somehow got ahead of the checkpoint. 
Use
     the checkpoint seq and reproces some purges through the index. This will do
     extra work but should be safe.
    
     * Index pseq < checkpoint pseq. Index somehow got behind the checkpoint 
and it
     looks like it could have skipped purges. For views we reset the index, and
     arguably that's the most correct solution. However, we never really had a
     reset facility for clouseau, so instead choose to emit an error log and let
     the user intervene manually but otherwise keep updating the index.
    
    When updating the purge sequence in clouseau, save an rpc call if we're not
    advancing clouseau's purge sequence. Clouseau as of recently already has a
    check to return `ok` right away if new purge_seq is somehow less or equal to
    the current one, but it's still nice not have to do an extra round-trip.
---
 src/dreyfus/src/dreyfus_index_updater.erl | 56 +++++++++++++++++++++++++------
 1 file changed, 46 insertions(+), 10 deletions(-)

diff --git a/src/dreyfus/src/dreyfus_index_updater.erl 
b/src/dreyfus/src/dreyfus_index_updater.erl
index 387ab09e2..159663aa3 100644
--- a/src/dreyfus/src/dreyfus_index_updater.erl
+++ b/src/dreyfus/src/dreyfus_index_updater.erl
@@ -30,8 +30,12 @@ update(IndexPid, Index) ->
     erlang:put(io_priority, {search, DbName, IndexName}),
     {ok, Db} = couch_db:open_int(DbName, []),
     try
+        CheckpointPSeq = get_local_doc_purge_seq(Db, Index),
+        {ok, ClouseauPSeq} = clouseau_rpc:get_purge_seq(Index),
+        IdxPurgeSeq = get_index_purge_seq(Db, CheckpointPSeq, ClouseauPSeq, 
DDocId, IndexName),
+        DbPurgeSeq = couch_db:get_purge_seq(Db),
+        TotalPurgeChanges = DbPurgeSeq - IdxPurgeSeq,
         TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq),
-        TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid),
         TotalChanges = TotalUpdateChanges + TotalPurgeChanges,
 
         couch_task_status:add_task([
@@ -49,7 +53,7 @@ update(IndexPid, Index) ->
 
         %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...]
         %The Rev is the final Rev, not purged Rev.
-        {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index),
+        {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index, IdxPurgeSeq, 
ClouseauPSeq),
         %% compute on all docs modified since we last computed.
 
         NewCurSeq = couch_db:get_update_seq(Db),
@@ -87,8 +91,7 @@ load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, 
ExcludeIdRevs} = A
             {ok, setelement(1, Acc, I + 1)}
     end.
 
-purge_index(Db, IndexPid, Index) ->
-    {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
+purge_index(Db, IndexPid, Index, IdxPurgeSeq, OldClouseauPSeq) ->
     Proc = get_os_process(Index#index.def_lang),
     try
         true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
@@ -113,18 +116,19 @@ purge_index(Db, IndexPid, Index) ->
         end,
         {ok, ExcludeList} = couch_db:fold_purge_infos(Db, IdxPurgeSeq, 
FoldFun, []),
         NewPurgeSeq = couch_db:get_purge_seq(Db),
-        ok = clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq),
+        case NewPurgeSeq > OldClouseauPSeq of
+            true ->
+                ok = clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq);
+            false ->
+                % Save an rpc call if aren't actualy advancing the purge 
sequence
+                ok
+        end,
         update_local_doc(Db, Index, NewPurgeSeq),
         {ok, ExcludeList}
     after
         ret_os_process(Proc)
     end.
 
-count_pending_purged_docs_since(Db, IndexPid) ->
-    DbPurgeSeq = couch_db:get_purge_seq(Db),
-    {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
-    DbPurgeSeq - IdxPurgeSeq.
-
 update_or_delete_index(IndexPid, Db, DI, Proc) ->
     #doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI,
     case Del of
@@ -152,6 +156,38 @@ update_local_doc(Db, Index, PurgeSeq) ->
     DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, 
Index),
     couch_db:update_doc(Db, DocContent, []).
 
+get_local_doc_purge_seq(Db, Index) ->
+    DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig),
+    % We're implicitly asserting this purge checkpoint doc should exist. This 
is
+    % created either on open or during compaction in on_compact handler
+    {ok, #doc{body = {[_ | _] = Props}}} = couch_db:open_doc(Db, DocId),
+    couch_util:get_value(<<"purge_seq">>, Props).
+
+get_index_purge_seq(Db, CheckpointPSeq, ClouseauPSeq, DDocId, IndexName) when
+    is_integer(CheckpointPSeq), is_integer(ClouseauPSeq), CheckpointPSeq >= 0, 
ClouseauPSeq >= 0
+->
+    if
+        CheckpointPSeq == ClouseauPSeq ->
+            % The default state is that they should match.
+            CheckpointPSeq;
+        CheckpointPSeq > ClouseauPSeq ->
+            % Somehow index fell behind. We should reset the index but don't 
really
+            % have a facility for it, so log an error instead. We still can 
only start folding
+            % purges from the checkpoint sequence onwards nd not below.
+            DbName = couch_db:name(Db),
+            Msg = "~p : index pseq:~p is behind the checkpoint pseq:~p db:~p 
ddoc:~p index:~p",
+            couch_log:error(Msg, [?MODULE, ClouseauPSeq, CheckpointPSeq, 
DbName, DDocId, IndexName]),
+            CheckpointPSeq;
+        CheckpointPSeq < ClouseauPSeq ->
+            % Somehow the checkpoint fell behind. Perhaps someone manually
+            % manipulated checkpoint docs or the index the system crashed 
right after
+            % the set_purge_seq was called but before the checkpoint doc was
+            % written. Choose to reprocess the changes from the checkpointed
+            % sequence, it may add extra work but should not lead to an
+            % inconsistent index.
+            CheckpointPSeq
+    end.
+
 update_task(NumChanges) ->
     [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
     Changes2 = Changes + NumChanges,

Reply via email to