This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch prototype/fdb-replicator in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit b75ad64f7ab1fd0b027430d47dad3dd700309069 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Mon Jul 20 18:33:42 2020 -0400 [wip] move stats updates individual jobs and use couch_jobs for it --- .../src/couch_replicator_scheduler.erl | 20 +---- .../src/couch_replicator_scheduler_job.erl | 88 ++++++++++++---------- 2 files changed, 49 insertions(+), 59 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index 00a352b..f0edf93 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -40,8 +40,7 @@ health_threshold/0, jobs/0, job/1, - restart_job/1, - update_job_stats/2 + restart_job/1 ]). %% config_listener callbacks @@ -217,11 +216,6 @@ restart_job(JobId) -> end. --spec update_job_stats(job_id(), term()) -> ok. -update_job_stats(JobId, Stats) -> - gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}). - - %% gen_server functions init(_) -> @@ -291,16 +285,6 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval), couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), {noreply, State#state{interval = Interval}}; -handle_cast({update_job_stats, JobId, Stats}, State) -> - case rep_state(JobId) of - nil -> - ok; - #rep{} = Rep -> - NewRep = Rep#rep{stats = Stats}, - true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep}) - end, - {noreply, State}; - handle_cast(UnexpectedMsg, State) -> couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]), {noreply, State}. @@ -1445,7 +1429,6 @@ t_job_summary_running() -> ?assertEqual(0, proplists:get_value(error_count, Summary)), Stats = [{source_seq, <<"1-abc">>}], - handle_cast({update_job_stats, job1, Stats}, mock_state(1)), Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), ?assertEqual({Stats}, proplists:get_value(info, Summary1)) end). @@ -1466,7 +1449,6 @@ t_job_summary_pending() -> ?assertEqual(0, proplists:get_value(error_count, Summary)), Stats = [{doc_write_failures, 1}], - handle_cast({update_job_stats, job1, Stats}, mock_state(1)), Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), ?assertEqual({Stats}, proplists:get_value(info, Summary1)) end). diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index fef96dd..b5b10ab 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -75,20 +75,18 @@ source_seq = nil, use_checkpoints = true, checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, - type = db, - view = nil, user = null, options = #{} }). -start_link(#{] = Job, #{} = JobData) -> +start_link(#{} = Job, #{} = JobData) -> case gen_server:start_link(?MODULE, {Job, JobData}, []) of {ok, Pid} -> {ok, Pid}; {error, Reason} -> #{?REP := Rep} = JobData, - {<<"id">> := Id, ?SOURCE := Src, ?TARGET := Ttg} = Rep, + {?REP_ID := Id, ?SOURCE := Src, ?TARGET := Ttg} = Rep, Source = couch_replicator_api_wrap:db_uri(Src), Target = couch_replicator_api_wrap:db_uri(Tgt), ErrMsg = "failed to start replication `~s` (`~s` -> `~s`)", @@ -211,8 +209,7 @@ handle_call({report_seq_done, Seq, StatsInc}, From, seqs_in_progress = NewSeqsInProgress, highest_seq_done = NewHighestDone }, - update_task(NewState), - {noreply, NewState}. + {noreply, update_task(NewState)}. handle_cast(checkpoint, State) -> @@ -342,14 +339,13 @@ terminate(shutdown, #rep_state{id = RepId} = State) -> LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p", couch_log:error(LogMsg, [?MODULE, RepId, Error]), State - end, - finish_couch_job(State1, <<"stopped">>, null), + end, finish_couch_job(State1, <<"stopped">>, null), terminate_cleanup(State1); terminate({shutdown, max_backoff}, {error, {#{} = Job, #{} = JobData}}) -> % Here we handle the case when replication fails during initialization. % That is before the #rep_state{} is even built. - #{?REP := #{<<"id">> := RepId}} = JobData, + #{?REP := #{?REP_ID := RepId}} = JobData, couch_stats:increment_counter([couch_replicator, failed_starts]), couch_log:warning("Replication `~s` reached max backoff ", [RepId]), finish_couch_job(Job, JobData, <<"error">>, max_backoff); @@ -358,11 +354,11 @@ terminate({shutdown, {error, Error}}, {error, Class, Stack, {Job, JobData}}) -> % Here we handle the case when replication fails during initialization. #{?REP := Rep} = JobData, #{ - <<"id">> := Id, + ?REP_ID := Id, ?SOURCE := Source0, ?TARGET := Target0, - <<"doc_id">> := DocId, - <<"db_name">> := DbName + ?DOC_ID := DocId, + ?DB_NAME := DbName } = Rep, Source = couch_replicator_api_wrap:db_uri(Source0), Target = couch_replicator_api_wrap:db_uri(Target0), @@ -542,8 +538,8 @@ finish_couch_job(#rep_state{} = State, FinishedState, Result) -> finish_couch_job(#{} = Job, #{} = JobData, FinishState, Result0) -> - #{?REP := #{<<"id">> := RepId}} = JobData, - case Result of + #{?REP := #{?REP_ID := RepId}} = JobData, + Result = case Result0 of null -> null; #{} -> Result0; <<_/binary>> -> Result0; @@ -583,19 +579,17 @@ cancel_timer(#rep_state{timer = Timer} = State) -> State#rep_state{timer = nil}. -init_state(#{} = Job, #{?REP =: Rep}} = JobData) -> +init_state(#{} = Job, #{} = JobData) -> + #{?REP := Rep} = JobData, #{ - <<"id">> := Id, - <<"base_id">> := BaseId, + ?REP_ID := Id, + ?BASE_ID := BaseId, ?SOURCE := Src0, ?TARGET := Tgt, - <<"type">> := Type, - <<"view">> := View, - <<"start_time">> := StartTime, - <<"stats">> := ArgStats0, - <<"options">> := OptionsMap, - <<"db_name">> := DbName, - <<"doc_id">> := DocId, + ?START_TIME := StartTime, + ?OPTIONS := OptionsMap, + ?DB_NAME := DbName, + ?DOC_ID := DocId, } = Rep, Options = maps:fold(fun(K, V, Acc) -> @@ -616,12 +610,13 @@ init_state(#{} = Job, #{?REP =: Rep}} = JobData) -> {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), - ArgStats1 = couch_replicator_stats:new(ArgStats0), + #{?REP_STATS := Stats0} = JobData, + Stats1 = couch_replicator_stats:new(Stats0), HistoryStats = case History of [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps); _ -> couch_replicator_stats:new() end, - Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), + Stats2 = couch_replicator_stats:max_stats(Stats1, HistoryStats), StartSeq1 = get_value(since_seq, Options, StartSeq0), StartSeq = {0, StartSeq1}, @@ -652,16 +647,14 @@ init_state(#{} = Job, #{?REP =: Rep}} = JobData) -> source_seq = SourceSeq, use_checkpoints = get_value(use_checkpoints, Options), checkpoint_interval = get_value(checkpoint_interval, Options), - type = Type, - view = View, - stats = Stats + stats = Stats2, doc_id = DocId, db_name = DbName }, State#rep_state{timer = start_timer(State)}. -find_and_migrate_logs(DbList, #{<<"base_id">> := BaseId} = Rep) -> +find_and_migrate_logs(DbList, #{?BASE_ID := BaseId} = Rep) -> LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId), fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, State, []). @@ -689,7 +682,7 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) -> end. -maybe_save_migrated_log(#{<<"options">> = Options}, Db, #doc{} = Doc, OldId) -> +maybe_save_migrated_log(#{?OPTIONS := Options}, Db, #doc{} = Doc, OldId) -> case maps:get(<<"use_checkpoints">>, Options) of true -> update_checkpoint(Db, Doc), @@ -736,8 +729,7 @@ do_checkpoint(#rep_state{use_checkpoints=false} = State) -> NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} }, {ok, NewState}; do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) -> - update_task(State), - {ok, State}; + {ok, update_task(State)}; do_checkpoint(State) -> #rep_state{ source_name=SourceName, @@ -818,8 +810,7 @@ do_checkpoint(State) -> source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} }, - update_task(NewState), - {ok, NewState} + {ok, update_task(NewState)} catch throw:{checkpoint_commit_failure, _} = Failure -> Failure end; @@ -996,18 +987,35 @@ get_pending_count_int(#rep_state{source = Db}=St) -> Pending. -update_task(State) -> +update_task(#rep_state{} = State) -> #rep_state{ - rep_details = #rep{id = JobId}, current_through_seq = {_, ThroughSeq}, highest_seq_done = {_, HighestSeq} } = State, - Status = rep_stats(State) ++ [ + NewStats = rep_stats(State) ++ [ {source_seq, HighestSeq}, {through_seq, ThroughSeq} ], - couch_replicator_scheduler:update_job_stats(JobId, Status), - couch_task_status:update(Status). + {ok, NewState} = update_job_stats(State, NewStats), + couch_task_status:update(Status), + NewState. + + +update_job_stats(#rep_state{} = State, NewStats) -> + #rep_state{ + job = Job, + job_data = JobData + } = State, + JsonStats = couch_replicator_stats:to_json(NewStats), + JobData1 = JobData#{?REP_STATS => JsonStats}, + case couch_jobs:update(undefined, Job, JobData1) of + {ok, Job1} -> + {ok, State#rep_state{job := Job1}}; + {error, halt} -> + ErrMsg = "~p : job halted, replication id: ~p", + couch_log:error(ErrMsg, [?MODULE, State#rep_state.id]), + error({error, halt}) + end. rep_stats(State) ->
