This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit e3096acf71bd6ce0ad98190c6bb7d34cd9625369
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Jul 20 18:31:18 2020 -0400

    [wip] improve job creation logic when docs/dbs change
---
 .../src/couch_replicator_doc_processor.erl         | 83 +++++++++++-----------
 1 file changed, 43 insertions(+), 40 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl 
b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index aff2c72..cc35a3c 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -72,7 +72,7 @@ after_db_create(DbName, DbUUID) ->
 
 after_db_delete(DbName, DbUUID) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
-    remove_replications_by_dbname(DbName, UUID)).
+    remove_replications_by_dbname(DbName, DbUUID)).
 
 
 after_doc_write(Db, #doc{} = Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) ->
@@ -85,12 +85,23 @@ after_doc_write(Db, #doc{} = Doc, _NewWinner, _OldWinner, 
_NewRevId, _Seq) ->
 process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
     ok;
 
-process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
-    Id = docs_job_id(DbName, Doc#doc.id),
-    ok = remove_replication_by_doc_job_id(Db, Id);
+process_change(#{} = Db, #doc{deleted = true} = Doc) ->
+    DbName = fabric2_db:name(Db),
+    DbUUID = fabric2_db:uuid(Db),
+    DocJobId = doc_job_id(DbName, Doc#doc.id),
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        case couch_jobs_fdb:get_job_data(JTx, ?REP_DOCS, DocJobId) of
+            {ok, #{?DB_NAME := DbName, ?DB_UUID := DbUUID} = Data} ->
+                remove_replication_by_doc_job_id(JTx, DocJobId, Data);
+            _ ->
+                ok
+        end
+    end).
 
-process_change(#{name := DbName} = Db, #doc{} = Doc) ->
+process_change(#{} = Db, #doc{} = Doc) ->
     #doc{id = DocId, body = {Props} = Body} = Doc,
+    DbName = fabric2_db:name(Db),
+    DbUUID = fabric2_db:uuid(Db),
     {Rep, Error} = try
         Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
         DocState = get_json_value(<<"_replication_state">>, Props, null),
@@ -100,9 +111,9 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
         throw:{bad_rep_doc, Reason} ->
             {null, couch_replicator_utils:rep_error_to_binary(Reason)}
     end,
-    case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
+    case couch_jobs:get_job_data(Db, ?REP_DOCS, doc_job_id(DbName, DocId)) of
         {error, not_found} ->
-            add_rep_doc_job(Db, DbName, DocId, Rep, Error);
+            add_rep_doc_job(Db, DbName, DbUUID, DocId, Rep, Error);
         {ok, #{?REP := null, ?REP_PARSE_ERROR := Error}}
                 when Rep =:= null ->
             % Same error as before occurred, don't bother updating the job
@@ -110,7 +121,7 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
         {ok, #{?REP := null}} when Rep =:= null ->
             % Error occured but it's a different error. Update the job so user
             % sees the new error
-            add_rep_doc_job(Db, DbName, DocId, Rep, Error);
+            add_rep_doc_job(Db, DbName, DbUUID, DocId, Rep, Error);
         {ok, #{?REP := OldRep, ?REP_PARSE_ERROR := OldError}} ->
             case compare_reps(OldRep, Rep) of
                 true ->
@@ -118,7 +129,7 @@ process_change(#{name := DbName} = Db, #doc{} = Doc) ->
                     % for the replication job have changed, so make it a no-op
                     ok;
                 false ->
-                    add_rep_doc_job(Db, DbName, DocId, Rep, Error)
+                    add_rep_doc_job(Db, DbUUID, DbName, DocId, Rep, Error)
             end
     end.
 
@@ -637,15 +648,16 @@ ejson_doc_state_filter(State, States) when 
is_list(States), is_atom(State) ->
     lists:member(State, States).
 
 
--spec add_rep_doc_job(any(), binary(), binary(), #{} | null,
+-spec add_rep_doc_job(any(), binary(), binary(), binary(), #{} | null,
     binary() | null) -> ok.
-add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) ->
-    JobId = docs_job_id(DbName, DocId),
+add_rep_doc_job(Tx, DbName, DbUUID, DocId, Rep, RepParseError) ->
+    DocJobId = doc_job_id(DbName, DocId),
     RepDocData = case Rep of
         null ->
             #{
                 ?REP => null,
                 ?DB_NAME => DbName,
+                ?DB_UUID => DbUUID,
                 ?DOC_ID => DocId,
                 ?STATE => ?ST_INITIALIZING,
                 ?STATE_INFO => RepParseError
@@ -664,46 +676,37 @@ add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) ->
             }
     end,
     couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
-       ok = remove_replication_by_doc_job_id(JTx, JobId),
+       case couch_jobs_fdb:get_job_data(JTx, ?REP_DOCS, DocJobId) of
+           {ok, #{} = Data} ->
+               ok = remove_replication_by_doc_job_id(JTx, DocJobId, Data);
+            {error, not_found} ->
+               ok
+       end,
        ok = couch_jobs:add(JTx, ?REP_DOCS, RepDocData)
     end).
 
 
-docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+doc_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
     <<DbName/binary, "|", Id/binary>>.
 
 
--spec remove_replication_by_doc_job_id(Tx, Id) -> ok.
-remove_replication_by_doc_job_id(Tx, Id) ->
-    case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
-        {error, not_found} ->
-            ok;
-        {ok, #{?REP := {?REP_ID :=  null}}} ->
-            couch_jobs:remove(Tx, ?REP_DOCS, Id),
+-spec remove_replication_by_doc_job_id(any(), binary(), #{}) -> ok.
+remove_replication_by_doc_job_id(JTx, DocJobId, Data) ->
+     case Data of
+        #{?REP := {?REP_ID :=  null}} ->
+            couch_jobs:remove(JTx, ?REP_DOCS, DocJobId),
             ok;
-        {ok, #{?REP := {?REP_ID := RepId}}} ->
-            couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
-                couch_jobs:remove(JTx, ?REP_JOBS, RepId),
-                couch_jobs:remove(JTx, ?REP_DOCS, Id)
-            end),
+        #{?REP := {?REP_ID := RepId}} when is_binary(RepId) ->
+            couch_jobs:remove(JTx, ?REP_JOBS, RepId),
+            couch_jobs:remove(JTx, ?REP_DOCS, DocJobId)
             ok
     end.
 
-
--spec remove_replications_by_dbname(DbName) -> ok.
+-spec remove_replications_by_dbname(DbName, DbUUID) -> ok.
 remove_replications_by_dbname(DbName) ->
-    DbNameSize = byte_size(DbName),
-    Filter = fun
-        (<<DbName:DbNameSize/binary, "|", _, _/binary>>) -> true;
-        (_) -> false
-    end,
-    JobsMap = couch_job:get_jobs(undefined, ?REP_DOCS, Filter),
-    % Batch these into smaller transactions eventually...
-    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
-        maps:map(fun(Id, _) ->
-            remove_replication_by_doc_job_id(JTx, Id)
-        end, JobsMap)
-    end).
+    couch_jobs:fold_jobs(undefined, ?REP_DOCS, fun({JTx, DocJobId, _, Data}, 
ok) ->
+        ok = remove_replication_by_doc_job_id(JTx, DocJobId, Data)
+    end, ok).
 
 
 -ifdef(TEST).

Reply via email to