This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit eb2362158dee96d5aa3eb67448109d5578f7d892
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Jul 20 17:01:36 2020 -0400

    [wip] add doc update and db create/delete callbacks
---
 rel/apps/couch_epi.config                          |  1 +
 .../src/couch_replicator_doc_processor.erl         | 48 ++++++++++++------
 src/couch_replicator/src/couch_replicator_epi.erl  | 58 ++++++++++++++++++++++
 .../src/couch_replicator_fabric2_plugin.erl        | 37 ++++++++++++++
 src/fabric/src/fabric2_db.erl                      | 15 ++++--
 5 files changed, 140 insertions(+), 19 deletions(-)

diff --git a/rel/apps/couch_epi.config b/rel/apps/couch_epi.config
index d371163..f9f49e1 100644
--- a/rel/apps/couch_epi.config
+++ b/rel/apps/couch_epi.config
@@ -16,6 +16,7 @@
     chttpd_epi,
     couch_index_epi,
     couch_views_epi,
+    couch_replicator_epi,
     dreyfus_epi,
     global_changes_epi,
     mango_epi,
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl 
b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index d892339..aff2c72 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -28,8 +28,9 @@
 ]).
 
 -export([
-    during_doc_update/3,
-    after_db_delete/1
+    after_db_create/2,
+    after_db_delete/2,
+    after_doc_write/6
 ]).
 
 -export([
@@ -62,15 +63,24 @@
 -define(MAX_JOBS, 500).
 
 
-during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
-    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
-    ok = process_change(Db, Doc).
+% EPI db monitoring plugin callbacks
 
+after_db_create(DbName, DbUUID) ->
+    couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
+    add_replications_by_dbname(DbName, DbUUID).
 
-after_db_delete(#{name := DbName}) ->
+
+after_db_delete(DbName, DbUUID) ->
     couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
-    remove_replications_by_dbname(DbName).
+    remove_replications_by_dbname(DbName, UUID)).
+
+
+after_doc_write(Db, #doc{} = Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) ->
+    couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+    ok = process_change(Db, Doc).
+
 
+% Process replication doc updates
 
 process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
     ok;
@@ -446,15 +456,19 @@ schedule_error_backoff(JTx, Job, ErrorCount) ->
     couch_jobs:resubmit(JTx, Job, trunc(When)).
 
 
-schedule_filter_check(JTx, Job, #{<<"filter_type">> := <<"user">>} = Rep) ->
-    IntervalSec = filter_check_interval_sec(),
-    NowSec = erlang:system_time(second),
-    When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec),
-    couch_jobs:resubmit(JTx, Job, trunc(When)).
-
-schedule_filter_check(_JTx, _Job, #{}) ->
-    ok.
-
+schedule_filter_check(JTx, Job, #{} = Rep) ->
+    #{?OPTIONS := Opts} = Rep,
+    case couch_replicator_filter:parse(Opts) of
+        {ok, {user, _FName, _QP}} ->
+            % For user filters, we have to periodically check the source
+            % in case the filter defintion has changed
+            IntervalSec = filter_check_interval_sec(),
+            NowSec = erlang:system_time(second),
+            When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec),
+            couch_jobs:resubmit(JTx, Job, trunc(When));
+        _ ->
+            ok
+    end.
 
 remove_old_state_fields(#{?DOC_STATE := DocState} = RepDocData) when
         DocState =:= ?TRIGGERED orelse DocState =:= ?ERROR ->
@@ -636,6 +650,7 @@ add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) ->
                 ?STATE => ?ST_INITIALIZING,
                 ?STATE_INFO => RepParseError
                 ?ERROR_COUNT => 0,
+                ?REP_STATS => #{},
                 ?LAST_UPDATED => erlang:system_time()
             };
         #{} ->
@@ -644,6 +659,7 @@ add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) ->
                 ?STATE => ?ST_INITIALIZING,
                 ?ERROR_COUNT => 0,
                 ?LAST_UPDATED => erlang:system_time(),
+                ?REP_STATS => #{},
                 ?STATE_INFO => null
             }
     end,
diff --git a/src/couch_replicator/src/couch_replicator_epi.erl 
b/src/couch_replicator/src/couch_replicator_epi.erl
new file mode 100644
index 0000000..9fb1790
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_epi.erl
@@ -0,0 +1,58 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_replicator_epi).
+
+
+-behaviour(couch_epi_plugin).
+
+
+-export([
+    app/0,
+    providers/0,
+    services/0,
+    data_subscriptions/0,
+    data_providers/0,
+    processes/0,
+    notify/3
+]).
+
+
+app() ->
+    couch_replicator.
+
+
+providers() ->
+    [
+        {fabric2_db, couch_replicator_fabric2_plugin}
+    ].
+
+
+services() ->
+    [].
+
+
+data_subscriptions() ->
+    [].
+
+
+data_providers() ->
+    [].
+
+
+processes() ->
+    [].
+
+
+notify(_Key, _Old, _New) ->
+    ok.
diff --git a/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl 
b/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl
new file mode 100644
index 0000000..7ba5872
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl
@@ -0,0 +1,37 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_replicator_fabric2_plugin).
+
+
+-export([
+    after_db_create/2,
+    after_db_delete/2,
+    after_doc_write/6
+]).
+
+
+
+after_db_create(DbName, DbUUID) ->
+    couch_replicator_doc_processor:after_db_create(DbName, DbUUID),
+    [DbName, DbUUID].
+
+
+after_db_delete(DbName, DbUUID) ->
+    couch_replicator_doc_processor:after_db_delete(DbName, DbUUID),
+    [DbName, DbUUID].
+
+
+after_doc_write(Db, Doc, NewWinner, OldWinner, NewRevId, Seq)->
+    couch_replicator_doc_processor:after_doc_write(Db, Doc, NewWinner, 
OldWinner, NewRevId, Seq),
+    [Db, Doc, NewWinner, OldWinner, NewRevId, Seq].
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 667cf35..3bd97c1 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -187,6 +187,7 @@ create(DbName, Options) ->
                 #{} = Db0 ->
                     Db1 = maybe_add_sys_db_callbacks(Db0),
                     ok = fabric2_server:store(Db1),
+                    fabric2_db_plugin:after_db_create(DbName, get_uuid(Db1)),
                     {ok, Db1#{tx := undefined}};
                 Error ->
                     Error
@@ -234,6 +235,7 @@ delete(DbName, Options) ->
                 fabric2_fdb:delete(TxDb)
             end),
             if Resp /= ok -> Resp; true ->
+                fabric2_db_plugin:after_db_delete(DbName, get_uuid(Db)),
                 fabric2_server:remove(DbName)
             end
     end.
@@ -242,9 +244,16 @@ delete(DbName, Options) ->
 undelete(DbName, TgtDbName, TimeStamp, Options) ->
     case validate_dbname(TgtDbName) of
         ok ->
-            fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
-                fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp)
-            end);
+            {Resp, DbUUID} = fabric2_fdb:transactional(DbName, Options,
+                fun(TxDb) ->
+                    Res = fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp),
+                    {Res, get_uuid(TxDb)}
+                end
+            end),
+            if Resp /= ok -> Resp; true ->
+                fabric2_db_plugin:after_db_create(DbName, DbUUID),
+                Resp
+            end;
         Error ->
             Error
     end.

Reply via email to