This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch prototype/fdb-replicator in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit eb2362158dee96d5aa3eb67448109d5578f7d892 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Mon Jul 20 17:01:36 2020 -0400 [wip] add doc update and db create/delete callbacks --- rel/apps/couch_epi.config | 1 + .../src/couch_replicator_doc_processor.erl | 48 ++++++++++++------ src/couch_replicator/src/couch_replicator_epi.erl | 58 ++++++++++++++++++++++ .../src/couch_replicator_fabric2_plugin.erl | 37 ++++++++++++++ src/fabric/src/fabric2_db.erl | 15 ++++-- 5 files changed, 140 insertions(+), 19 deletions(-) diff --git a/rel/apps/couch_epi.config b/rel/apps/couch_epi.config index d371163..f9f49e1 100644 --- a/rel/apps/couch_epi.config +++ b/rel/apps/couch_epi.config @@ -16,6 +16,7 @@ chttpd_epi, couch_index_epi, couch_views_epi, + couch_replicator_epi, dreyfus_epi, global_changes_epi, mango_epi, diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index d892339..aff2c72 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -28,8 +28,9 @@ ]). -export([ - during_doc_update/3, - after_db_delete/1 + after_db_create/2, + after_db_delete/2, + after_doc_write/6 ]). -export([ @@ -62,15 +63,24 @@ -define(MAX_JOBS, 500). -during_doc_update(#doc{} = Doc, Db, _UpdateType) -> - couch_stats:increment_counter([couch_replicator, docs, db_changes]), - ok = process_change(Db, Doc). +% EPI db monitoring plugin callbacks +after_db_create(DbName, DbUUID) -> + couch_stats:increment_counter([couch_replicator, docs, dbs_created]), + add_replications_by_dbname(DbName, DbUUID). -after_db_delete(#{name := DbName}) -> + +after_db_delete(DbName, DbUUID) -> couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]), - remove_replications_by_dbname(DbName). + remove_replications_by_dbname(DbName, UUID)). + + +after_doc_write(Db, #doc{} = Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) -> + couch_stats:increment_counter([couch_replicator, docs, db_changes]), + ok = process_change(Db, Doc). + +% Process replication doc updates process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) -> ok; @@ -446,15 +456,19 @@ schedule_error_backoff(JTx, Job, ErrorCount) -> couch_jobs:resubmit(JTx, Job, trunc(When)). -schedule_filter_check(JTx, Job, #{<<"filter_type">> := <<"user">>} = Rep) -> - IntervalSec = filter_check_interval_sec(), - NowSec = erlang:system_time(second), - When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec), - couch_jobs:resubmit(JTx, Job, trunc(When)). - -schedule_filter_check(_JTx, _Job, #{}) -> - ok. - +schedule_filter_check(JTx, Job, #{} = Rep) -> + #{?OPTIONS := Opts} = Rep, + case couch_replicator_filter:parse(Opts) of + {ok, {user, _FName, _QP}} -> + % For user filters, we have to periodically check the source + % in case the filter defintion has changed + IntervalSec = filter_check_interval_sec(), + NowSec = erlang:system_time(second), + When = NowSec + 0.5 * IntervalSec + rand:uniform(IntervalSec), + couch_jobs:resubmit(JTx, Job, trunc(When)); + _ -> + ok + end. remove_old_state_fields(#{?DOC_STATE := DocState} = RepDocData) when DocState =:= ?TRIGGERED orelse DocState =:= ?ERROR -> @@ -636,6 +650,7 @@ add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) -> ?STATE => ?ST_INITIALIZING, ?STATE_INFO => RepParseError ?ERROR_COUNT => 0, + ?REP_STATS => #{}, ?LAST_UPDATED => erlang:system_time() }; #{} -> @@ -644,6 +659,7 @@ add_rep_doc_job(Tx, DbName, DocId, Rep, RepParseError) -> ?STATE => ?ST_INITIALIZING, ?ERROR_COUNT => 0, ?LAST_UPDATED => erlang:system_time(), + ?REP_STATS => #{}, ?STATE_INFO => null } end, diff --git a/src/couch_replicator/src/couch_replicator_epi.erl b/src/couch_replicator/src/couch_replicator_epi.erl new file mode 100644 index 0000000..9fb1790 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_epi.erl @@ -0,0 +1,58 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-module(couch_replicator_epi). + + +-behaviour(couch_epi_plugin). + + +-export([ + app/0, + providers/0, + services/0, + data_subscriptions/0, + data_providers/0, + processes/0, + notify/3 +]). + + +app() -> + couch_replicator. + + +providers() -> + [ + {fabric2_db, couch_replicator_fabric2_plugin} + ]. + + +services() -> + []. + + +data_subscriptions() -> + []. + + +data_providers() -> + []. + + +processes() -> + []. + + +notify(_Key, _Old, _New) -> + ok. diff --git a/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl b/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl new file mode 100644 index 0000000..7ba5872 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_fabric2_plugin.erl @@ -0,0 +1,37 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-module(couch_replicator_fabric2_plugin). + + +-export([ + after_db_create/2, + after_db_delete/2, + after_doc_write/6 +]). + + + +after_db_create(DbName, DbUUID) -> + couch_replicator_doc_processor:after_db_create(DbName, DbUUID), + [DbName, DbUUID]. + + +after_db_delete(DbName, DbUUID) -> + couch_replicator_doc_processor:after_db_delete(DbName, DbUUID), + [DbName, DbUUID]. + + +after_doc_write(Db, Doc, NewWinner, OldWinner, NewRevId, Seq)-> + couch_replicator_doc_processor:after_doc_write(Db, Doc, NewWinner, OldWinner, NewRevId, Seq), + [Db, Doc, NewWinner, OldWinner, NewRevId, Seq]. diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 667cf35..3bd97c1 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -187,6 +187,7 @@ create(DbName, Options) -> #{} = Db0 -> Db1 = maybe_add_sys_db_callbacks(Db0), ok = fabric2_server:store(Db1), + fabric2_db_plugin:after_db_create(DbName, get_uuid(Db1)), {ok, Db1#{tx := undefined}}; Error -> Error @@ -234,6 +235,7 @@ delete(DbName, Options) -> fabric2_fdb:delete(TxDb) end), if Resp /= ok -> Resp; true -> + fabric2_db_plugin:after_db_delete(DbName, get_uuid(Db)), fabric2_server:remove(DbName) end end. @@ -242,9 +244,16 @@ delete(DbName, Options) -> undelete(DbName, TgtDbName, TimeStamp, Options) -> case validate_dbname(TgtDbName) of ok -> - fabric2_fdb:transactional(DbName, Options, fun(TxDb) -> - fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp) - end); + {Resp, DbUUID} = fabric2_fdb:transactional(DbName, Options, + fun(TxDb) -> + Res = fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp), + {Res, get_uuid(TxDb)} + end + end), + if Resp /= ok -> Resp; true -> + fabric2_db_plugin:after_db_create(DbName, DbUUID), + Resp + end; Error -> Error end.
