This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch prototype/fdb-replicator in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit e3096acf71bd6ce0ad98190c6bb7d34cd9625369 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Mon Jul 20 18:31:18 2020 -0400 [wip] improve job creation logic when docs/dbs change --- .../src/couch_replicator_doc_processor.erl | 83 +++++++++++----------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index aff2c72..cc35a3c 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -72,7 +72,7 @@ after_db_create(DbName, DbUUID) -> after_db_delete(DbName, DbUUID) -> couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]), - remove_replications_by_dbname(DbName, UUID)). + remove_replications_by_dbname(DbName, DbUUID)). after_doc_write(Db, #doc{} = Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) -> @@ -85,12 +85,23 @@ after_doc_write(Db, #doc{} = Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) -> process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) -> ok; -process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) -> - Id = docs_job_id(DbName, Doc#doc.id), - ok = remove_replication_by_doc_job_id(Db, Id); +process_change(#{} = Db, #doc{deleted = true} = Doc) -> + DbName = fabric2_db:name(Db), + DbUUID = fabric2_db:uuid(Db), + DocJobId = doc_job_id(DbName, Doc#doc.id), + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + case couch_jobs_fdb:get_job_data(JTx, ?REP_DOCS, DocJobId) of + {ok, #{?DB_NAME := DbName, ?DB_UUID := DbUUID} = Data} -> + remove_replication_by_doc_job_id(JTx, DocJobId, Data); + _ -> + ok + end + end). -process_change(#{name := DbName} = Db, #doc{} = Doc) -> +process_change(#{} = Db, #doc{} = Doc) -> #doc{id = DocId, body = {Props} = Body} = Doc, + DbName = fabric2_db:name(Db), + DbUUID = fabric2_db:uuid(Db), {Rep, Error} = try Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body), DocState = get_json_value(<<"_replication_state">>, Props, null), @@ -100,9 +111,9 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) -> throw:{bad_rep_doc, Reason} -> {null, couch_replicator_utils:rep_error_to_binary(Reason)} end, - case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of + case couch_jobs:get_job_data(Db, ?REP_DOCS, doc_job_id(DbName, DocId)) of {error, not_found} -> - add_rep_doc_job(Db, DbName, DocId, Rep, Error); + add_rep_doc_job(Db, DbName, DbUUID, DocId, Rep, Error); {ok, #{?REP := null, ?REP_PARSE_ERROR := Error}} when Rep =:= null -> % Same error as before occurred, don't bother updating the job @@ -110,7 +121,7 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) -> {ok, #{?REP := null}} when Rep =:= null -> % Error occured but it's a different error. Update the job so user % sees the new error - add_rep_doc_job(Db, DbName, DocId, Rep, Error); + add_rep_doc_job(Db, DbName, DbUUID, DocId, Rep, Error); {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} -> case compare_reps(OldRep, Rep) of true -> @@ -118,7 +129,7 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) -> % for the replication job have changed, so make it a no-op ok; false -> - add_rep_doc_job(Db, DbName, DocId, Rep, Error) + add_rep_doc_job(Db, DbUUID, DbName, DocId, Rep, Error) end end. @@ -637,15 +648,16 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) -> lists:member(State, States). --spec add_rep_doc_job(any(), binary(), binary(), #{} | null, +-spec add_rep_doc_job(any(), binary(), binary(), binary(), #{} | null, binary() | null) -> ok. -add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) -> - JobId = docs_job_id(DbName, DocId), +add_rep_doc_job(Tx, DbName, DbUUID, DocId, Rep, RepParseError) -> + DocJobId = doc_job_id(DbName, DocId), RepDocData = case Rep of null -> #{ ?REP => null, ?DB_NAME => DbName, + ?DB_UUID => DbUUID, ?DOC_ID => DocId, ?STATE => ?ST_INITIALIZING, ?STATE_INFO => RepParseError @@ -664,46 +676,37 @@ add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) -> } end, couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> - ok = remove_replication_by_doc_job_id(JTx, JobId), + case couch_jobs_fdb:get_job_data(JTx, ?REP_DOCS, DocJobId) of + {ok, #{} = Data} -> + ok = remove_replication_by_doc_job_id(JTx, DocJobId, Data); + {error, not_found} -> + ok + end, ok = couch_jobs:add(JTx, ?REP_DOCS, RepDocData) end). -docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) -> +doc_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) -> <<DbName/binary, "|", Id/binary>>. --spec remove_replication_by_doc_job_id(Tx, Id) -> ok. -remove_replication_by_doc_job_id(Tx, Id) -> - case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of - {error, not_found} -> - ok; - {ok, #{?REP := {?REP_ID := null}}} -> - couch_jobs:remove(Tx, ?REP_DOCS, Id), +-spec remove_replication_by_doc_job_id(any(), binary(), #{}) -> ok. +remove_replication_by_doc_job_id(JTx, DocJobId, Data) -> + case Data of + #{?REP := {?REP_ID := null}} -> + couch_jobs:remove(JTx, ?REP_DOCS, DocJobId), ok; - {ok, #{?REP := {?REP_ID := RepId}}} -> - couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> - couch_jobs:remove(JTx, ?REP_JOBS, RepId), - couch_jobs:remove(JTx, ?REP_DOCS, Id) - end), + #{?REP := {?REP_ID := RepId}} when is_binary(RepId) -> + couch_jobs:remove(JTx, ?REP_JOBS, RepId), + couch_jobs:remove(JTx, ?REP_DOCS, DocJobId) ok end. - --spec remove_replications_by_dbname(DbName) -> ok. +-spec remove_replications_by_dbname(DbName, DbUUID) -> ok. remove_replications_by_dbname(DbName) -> - DbNameSize = byte_size(DbName), - Filter = fun - (<<DbName:DbNameSize/binary, "|", _, _/binary>>) -> true; - (_) -> false - end, - JobsMap = couch_job:get_jobs(undefined, ?REP_DOCS, Filter), - % Batch these into smaller transactions eventually... - couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> - maps:map(fun(Id, _) -> - remove_replication_by_doc_job_id(JTx, Id) - end, JobsMap) - end). + couch_jobs:fold_jobs(undefined, ?REP_DOCS, fun({JTx, DocJobId, _, Data}, ok) -> + ok = remove_replication_by_doc_job_id(JTx, DocJobId, Data) + end, ok). -ifdef(TEST).
