Author: kocolosk
Date: Wed Oct 28 19:44:26 2009
New Revision: 830742
URL: http://svn.apache.org/viewvc?rev=830742&view=rev
Log:
reboot replication from last checkpoint if DB is compacted or server restarts
Modified:
couchdb/branches/0.10.x/ (props changed)
couchdb/branches/0.10.x/etc/default/couchdb (props changed)
couchdb/branches/0.10.x/src/couchdb/couch_rep.erl
couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl
Propchange: couchdb/branches/0.10.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 28 19:44:26 2009
@@ -4,4 +4,4 @@
/couchdb/branches/list-iterator:782292-784593
/couchdb/branches/tail_header:775760-778477
/couchdb/tags/0.10.0:825400
-/couchdb/trunk:806983,807208-807478,807771,808574,808632,808716,808876,809134,809977,810015,810028,810350,810358,810435,811910,813803,815921,817278,817398,817400,817403,817749,817793,818249,818357,819091,819341,819343,819436,819799,819977,820344,820469,820495,820851,825407,826692,829871,829919-829920
+/couchdb/trunk:806983,807208-807478,807771,808574,808632,808716,808876,809134,809977,810015,810028,810350,810358,810435,811910,813803,815921,817278,817398,817400,817403,817749,817793,818249,818357,819091,819341,819343,819436,819799,819977,820344,820469,820495,820851,825407,826692,829871,829919-829920,830737
Propchange: couchdb/branches/0.10.x/etc/default/couchdb
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 28 19:44:26 2009
@@ -4,5 +4,5 @@
/couchdb/branches/list-iterator/etc/default/couchdb:782292-784593
/couchdb/branches/tail_header/etc/default/couchdb:775760-778477
/couchdb/tags/0.10.0/etc/default/couchdb:825400
-/couchdb/trunk/etc/default/couchdb:806983,807208-807478,807771,808574,808632,808716,808876,809134,809977,810015,810028,810350,810358,810435,811910,813803,815921,817277-817278,817398,817400,817403,817749,817793,818249,818357,819091,819341,819343,819436,819799,819977,820344,820469,820495,820851,825407,826692,829871,829919-829920
+/couchdb/trunk/etc/default/couchdb:806983,807208-807478,807771,808574,808632,808716,808876,809134,809977,810015,810028,810350,810358,810435,811910,813803,815921,817277-817278,817398,817400,817403,817749,817793,818249,818357,819091,819341,819343,819436,819799,819977,820344,820469,820495,820851,825407,826692,829871,829919-829920,830737
/incubator/couchdb/trunk/etc/default/couchdb:642419-694440
Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep.erl
URL:
http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep.erl?rev=830742&r1=830741&r2=830742&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep.erl Wed Oct 28 19:44:26 2009
@@ -482,53 +482,62 @@
tgt_starttime = TgtInstanceStartTime,
stats = Stats
} = State,
- ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
- RecordSeqNum = case commit_to_both(Source, Target, NewSeqNum) of
+ case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
- NewSeqNum;
+ ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq
~p",
+ [dbname(Source), dbname(Target), NewSeqNum]),
+ SessionId = couch_uuids:random(),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, StartSeqNum},
+ {<<"end_last_seq">>, NewSeqNum},
+ {<<"recorded_seq">>, NewSeqNum},
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+ {<<"doc_write_failures">>,
+ ets:lookup_element(Stats, doc_write_failures, 2)}
+ ]},
+ % limit history to 50 entries
+ NewRepHistory = {[
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, NewSeqNum},
+ {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
+ ]},
+
+ try
+ {SrcRevPos,SrcRevId} =
+ update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
+ {TgtRevPos,TgtRevId} =
+ update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
+ State#state{
+ checkpoint_scheduled = nil,
+ checkpoint_history = NewRepHistory,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ }
+ catch throw:conflict ->
+ ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
+ "yourself?)", []),
+ State
+ end;
_Else ->
- ?LOG_INFO("A server has restarted since replication start. "
- "Not recording the new sequence number to ensure the "
- "replication is redone and documents reexamined.", []),
- StartSeqNum
- end,
- SessionId = couch_util:new_uuid(),
- NewHistoryEntry = {[
- {<<"session_id">>, SessionId},
- {<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, StartSeqNum},
- {<<"end_last_seq">>, NewSeqNum},
- {<<"recorded_seq">>, RecordSeqNum},
- {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
- {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>,
- ets:lookup_element(Stats, doc_write_failures, 2)}
- ]},
- % limit history to 50 entries
- NewRepHistory = {[
- {<<"session_id">>, SessionId},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
- ]},
-
- try
- {SrcRevPos,SrcRevId} =
- update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
- {TgtRevPos,TgtRevId} =
- update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
- State#state{
- checkpoint_scheduled = nil,
- checkpoint_history = NewRepHistory,
- source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
- target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
- }
- catch throw:conflict ->
- ?LOG_ERROR("checkpoint failure: conflict (are you replicating to
yourself?)",
- []),
- State
+ ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
+ [dbname(Source), dbname(Target)]),
+ #state{
+ changes_feed = CF,
+ missing_revs = MR,
+ reader = Reader,
+ writer = Writer
+ } = State,
+ Pids = [CF, MR, Reader, Writer],
+ [unlink(Pid) || Pid <- Pids],
+ [exit(Pid, shutdown) || Pid <- Pids],
+ {ok, NewState} = init(State#state.init_args),
+ NewState
end.
commit_to_both(Source, Target, RequiredSeq) ->
@@ -565,12 +574,12 @@
InstanceStartTime = NewDb#db.instance_start_time,
couch_db:close(NewDb),
if UpdateSeq > CommitSeq ->
- ?LOG_DEBUG("replication needs a full commit: update ~p commit ~p",
+ ?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
[UpdateSeq, CommitSeq]),
{ok, DbStartTime} = couch_db:ensure_full_commit(Target),
DbStartTime;
true ->
- ?LOG_DEBUG("replication doesn't need a full commit", []),
+ ?LOG_DEBUG("target doesn't need a full commit", []),
InstanceStartTime
end.
@@ -592,9 +601,12 @@
InstanceStartTime = NewDb#db.instance_start_time,
couch_db:close(NewDb),
if RequiredSeq > CommitSeq ->
+ ?LOG_DEBUG("source needs a full commit: required ~p committed ~p",
+ [RequiredSeq, CommitSeq]),
{ok, DbStartTime} = couch_db:ensure_full_commit(Source),
DbStartTime;
true ->
+ ?LOG_DEBUG("source doesn't need a full commit", []),
InstanceStartTime
end.
Modified: couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl
URL:
http://svn.apache.org/viewvc/couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl?rev=830742&r1=830741&r2=830742&view=diff
==============================================================================
--- couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/branches/0.10.x/src/couchdb/couch_rep_reader.erl Wed Oct 28
19:44:26 2009
@@ -262,6 +262,7 @@
maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
{ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]),
+ couch_db:close(Db),
NewDb;
maybe_reopen_db(Db, _HighSeq) ->
Db.