Author: fdmanana Date: Fri Oct 1 18:00:48 2010 New Revision: 1003599 URL: http://svn.apache.org/viewvc?rev=1003599&view=rev Log: New replicator: avoid having a replication gen_server receiving 1 message for each processed source sequence. Now it groups them into lists with size up to ?DOC_BATCH_SIZE.
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl?rev=1003599&r1=1003598&r2=1003599&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator.erl Fri Oct 1 18:00:48 2010 @@ -278,42 +278,8 @@ handle_info({seq_start, {Seq, NumChanges State#rep_state.seqs_in_progress), {noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}}; -handle_info({seq_changes_done, {Seq, NumChangesDone}}, State) -> - #rep_state{ - seqs_in_progress = SeqsInProgress, - next_through_seqs = DoneSeqs, - is_successor_seq = IsSuccFun - } = State, - % Decrement the # changes for this seq by NumChangesDone. - TotalChanges = gb_trees:get(Seq, SeqsInProgress), - NewState = case TotalChanges - NumChangesDone of - 0 -> - % This seq is completely processed. Check to see if it was the - % smallest seq in progess. If so, we've made progress that can - % be checkpointed. - State2 = case gb_trees:smallest(SeqsInProgress) of - {Seq, _} -> - {CheckpointSeq, DoneSeqs2} = next_seq_before_gap( - Seq, DoneSeqs, IsSuccFun), - State#rep_state{ - current_through_seq = CheckpointSeq, - next_through_seqs = DoneSeqs2 - }; - _ -> - DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs), - State#rep_state{next_through_seqs = DoneSeqs2} - end, - State2#rep_state{ - seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress) - }; - NewTotalChanges when NewTotalChanges > 0 -> - % There are still some changes that need work done. - % Put the new count back. - State#rep_state{ - seqs_in_progress = - gb_trees:update(Seq, NewTotalChanges, SeqsInProgress) - } - end, +handle_info({seq_changes_done, Changes}, State) -> + NewState = lists:foldl(fun process_seq_changes_done/2, State, Changes), {noreply, NewState}; handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) -> @@ -710,3 +676,40 @@ has_session_id(SessionId, [{Props} | Res has_session_id(SessionId, Rest) end. + +process_seq_changes_done({Seq, NumChangesDone}, State) -> + #rep_state{ + seqs_in_progress = SeqsInProgress, + next_through_seqs = DoneSeqs, + is_successor_seq = IsSuccFun + } = State, + % Decrement the # changes for this seq by NumChangesDone. + TotalChanges = gb_trees:get(Seq, SeqsInProgress), + case TotalChanges - NumChangesDone of + 0 -> + % This seq is completely processed. Check to see if it was the + % smallest seq in progess. If so, we've made progress that can + % be checkpointed. + State2 = case gb_trees:smallest(SeqsInProgress) of + {Seq, _} -> + {CheckpointSeq, DoneSeqs2} = next_seq_before_gap( + Seq, DoneSeqs, IsSuccFun), + State#rep_state{ + current_through_seq = CheckpointSeq, + next_through_seqs = DoneSeqs2 + }; + _ -> + DoneSeqs2 = ordsets:add_element(Seq, DoneSeqs), + State#rep_state{next_through_seqs = DoneSeqs2} + end, + State2#rep_state{ + seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress) + }; + NewTotalChanges when NewTotalChanges > 0 -> + % There are still some changes that need work done. + % Put the new count back. + State#rep_state{ + seqs_in_progress = + gb_trees:update(Seq, NewTotalChanges, SeqsInProgress) + } + end. Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl?rev=1003599&r1=1003598&r2=1003599&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_doc_copiers.erl Fri Oct 1 18:00:48 2010 @@ -105,6 +105,8 @@ write_doc(Doc, Seq, Db, Cp) -> seqs_done([{Seq, 1}], Cp). +bulk_write_docs([], _, _, _) -> + ok; bulk_write_docs(Docs, Seqs, Db, Cp) -> case couch_api_wrap:update_docs( Db, Docs, [delay_commit], replicated_changes) of @@ -125,9 +127,9 @@ bulk_write_docs(Docs, Seqs, Db, Cp) -> seqs_done(Seqs, Cp). +seqs_done([], _) -> + ok; +seqs_done([{nil, _} | _], _) -> + ok; seqs_done(SeqCounts, Cp) -> - lists:foreach(fun({nil, _}) -> - ok; - (SeqCount) -> - Cp ! {seq_changes_done, SeqCount} - end, SeqCounts). + Cp ! {seq_changes_done, SeqCounts}. Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl?rev=1003599&r1=1003598&r2=1003599&view=diff ============================================================================== --- couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl (original) +++ couchdb/branches/new_replicator/src/couchdb/couch_replicator_rev_finders.erl Fri Oct 1 18:00:48 2010 @@ -69,9 +69,9 @@ missing_revs_finder_loop(FinderId, Cp, T #doc_info{id=Id, revs=RevsInfo, high_seq=Seq} <- DocInfos]), NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing), % signal the completion of these that aren't missing - lists:foreach(fun({_Id, {Revs, Seq}}) -> - Cp ! {seq_changes_done, {Seq, length(Revs)}} - end, dict:to_list(NonMissingIdRevsSeqDict)), + Cp ! {seq_changes_done, + [{Seq, length(Revs)} || + {_Id, {Revs, Seq}} <- dict:to_list(NonMissingIdRevsSeqDict)]}, % Expand out each docs and seq into it's own work item lists:foreach(fun({Id, Revs, PAs}) ->