This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch handle-purge-seq-and-checkpoint-oddness in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit de52fa78806e98bc622cf19ea6356e5255c6bdb0 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Thu Mar 12 17:24:15 2026 -0400 Handle cases when dreyfus checkpoint is out-of-sync with the index Currently, there are two places where the index purge seq is tracked: in the index and in the db local doc checkpoints. Purge sequence folding should never start below the value in the checkpoint document as that could raise an `invalid_start_purge_seq`. Normally both sequences should match, but if they don't try to be explicit about what should happen: * Index pseq > checkpoint pseq. Index somehow got ahead of the checkpoint. Use the checkpoint seq and reproces some purges through the index. This will do extra work but should be safe. * Index pseq < checkpoint pseq. Index somehow got behind the checkpoint and it looks like it could have skipped purges. For views we reset the index, and arguably that's the most correct solution. However, we never really had a reset facility for clouseau, so instead choose to emit an error log and let the user intervene manually but otherwise keep updating the index. When updating the purge sequence in clouseau, save an rpc call if we're not advancing clouseau's purge sequence. Clouseau as of recently already has a check to return `ok` right away if new purge_seq is somehow less or equal to the current one, but it's still nice not have to do an extra round-trip. --- src/dreyfus/src/dreyfus_index_updater.erl | 56 +++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl index 387ab09e2..159663aa3 100644 --- a/src/dreyfus/src/dreyfus_index_updater.erl +++ b/src/dreyfus/src/dreyfus_index_updater.erl @@ -30,8 +30,12 @@ update(IndexPid, Index) -> erlang:put(io_priority, {search, DbName, IndexName}), {ok, Db} = couch_db:open_int(DbName, []), try + CheckpointPSeq = get_local_doc_purge_seq(Db, Index), + {ok, ClouseauPSeq} = clouseau_rpc:get_purge_seq(Index), + IdxPurgeSeq = get_index_purge_seq(Db, CheckpointPSeq, ClouseauPSeq, DDocId, IndexName), + DbPurgeSeq = couch_db:get_purge_seq(Db), + TotalPurgeChanges = DbPurgeSeq - IdxPurgeSeq, TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq), - TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid), TotalChanges = TotalUpdateChanges + TotalPurgeChanges, couch_task_status:add_task([ @@ -49,7 +53,7 @@ update(IndexPid, Index) -> %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...] %The Rev is the final Rev, not purged Rev. - {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index), + {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index, IdxPurgeSeq, ClouseauPSeq), %% compute on all docs modified since we last computed. NewCurSeq = couch_db:get_update_seq(Db), @@ -87,8 +91,7 @@ load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = A {ok, setelement(1, Acc, I + 1)} end. -purge_index(Db, IndexPid, Index) -> - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), +purge_index(Db, IndexPid, Index, IdxPurgeSeq, OldClouseauPSeq) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), @@ -113,18 +116,19 @@ purge_index(Db, IndexPid, Index) -> end, {ok, ExcludeList} = couch_db:fold_purge_infos(Db, IdxPurgeSeq, FoldFun, []), NewPurgeSeq = couch_db:get_purge_seq(Db), - ok = clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq), + case NewPurgeSeq > OldClouseauPSeq of + true -> + ok = clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq); + false -> + % Save an rpc call if aren't actualy advancing the purge sequence + ok + end, update_local_doc(Db, Index, NewPurgeSeq), {ok, ExcludeList} after ret_os_process(Proc) end. -count_pending_purged_docs_since(Db, IndexPid) -> - DbPurgeSeq = couch_db:get_purge_seq(Db), - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), - DbPurgeSeq - IdxPurgeSeq. - update_or_delete_index(IndexPid, Db, DI, Proc) -> #doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of @@ -152,6 +156,38 @@ update_local_doc(Db, Index, PurgeSeq) -> DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, Index), couch_db:update_doc(Db, DocContent, []). +get_local_doc_purge_seq(Db, Index) -> + DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig), + % We're implicitly asserting this purge checkpoint doc should exist. This is + % created either on open or during compaction in on_compact handler + {ok, #doc{body = {[_ | _] = Props}}} = couch_db:open_doc(Db, DocId), + couch_util:get_value(<<"purge_seq">>, Props). + +get_index_purge_seq(Db, CheckpointPSeq, ClouseauPSeq, DDocId, IndexName) when + is_integer(CheckpointPSeq), is_integer(ClouseauPSeq), CheckpointPSeq >= 0, ClouseauPSeq >= 0 +-> + if + CheckpointPSeq == ClouseauPSeq -> + % The default state is that they should match. + CheckpointPSeq; + CheckpointPSeq > ClouseauPSeq -> + % Somehow index fell behind. We should reset the index but don't really + % have a facility for it, so log an error instead. We still can only start folding + % purges from the checkpoint sequence onwards nd not below. + DbName = couch_db:name(Db), + Msg = "~p : index pseq:~p is behind the checkpoint pseq:~p db:~p ddoc:~p index:~p", + couch_log:error(Msg, [?MODULE, ClouseauPSeq, CheckpointPSeq, DbName, DDocId, IndexName]), + CheckpointPSeq; + CheckpointPSeq < ClouseauPSeq -> + % Somehow the checkpoint fell behind. Perhaps someone manually + % manipulated checkpoint docs or the index the system crashed right after + % the set_purge_seq was called but before the checkpoint doc was + % written. Choose to reprocess the changes from the checkpointed + % sequence, it may add extra work but should not lead to an + % inconsistent index. + CheckpointPSeq + end. + update_task(NumChanges) -> [Changes, Total] = couch_task_status:get([changes_done, total_changes]), Changes2 = Changes + NumChanges,
