Improve compaction efficiency with a temp file. Databases that are updated randomly by doc id cause compaction to be noticably less efficient than possible. This is due to the order in which the id_tree is written. The random updates cause the append only btree to generate a noticeable amount of waste.
This patch works by writing the id_tree to a temporary file during compaction and then streams this btree back to the compaction file during a final pass. The temporary file is managed by the new couch_emsort module which runs an external merge sort using append only file storage. This speeds up the insertion of document ids into the temporary file by delaying the major merging until the final phase of insertion back into the main compaction file. Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/5d3753d0 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/5d3753d0 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/5d3753d0 Branch: refs/heads/import Commit: 5d3753d0662cfa676fdf65d0a543be205499ec11 Parents: e11b351 Author: Paul J. Davis <[email protected]> Authored: Mon Sep 26 19:52:06 2011 -0500 Committer: Paul J. Davis <[email protected]> Committed: Fri Jan 17 16:44:29 2014 -0800 ---------------------------------------------------------------------- Makefile.am | 2 + src/couch_db_updater.erl | 353 ++++++++++++++++++++++++++++++++++-------- src/couch_emsort.erl | 318 +++++++++++++++++++++++++++++++++++++ src/couch_server.erl | 8 +- 4 files changed, 611 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/5d3753d0/Makefile.am ---------------------------------------------------------------------- diff --git a/Makefile.am b/Makefile.am index ae78cb9..63c557e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -42,6 +42,7 @@ source_files = \ src/couch_doc.erl \ src/couch_drv.erl \ src/couch_ejson_compare.erl \ + src/couch_emsort.erl \ src/couch_event_sup.erl \ src/couch_external_manager.erl \ src/couch_external_server.erl \ @@ -102,6 +103,7 @@ compiled_files = \ ebin/couch_doc.beam \ ebin/couch_drv.beam \ ebin/couch_ejson_compare.beam \ + ebin/couch_emsort.beam \ ebin/couch_event_sup.beam \ ebin/couch_external_manager.beam \ ebin/couch_external_server.beam \ http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/5d3753d0/src/couch_db_updater.erl ---------------------------------------------------------------------- diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl index 21a1fb5..c3e1ac3 100644 --- a/src/couch_db_updater.erl +++ b/src/couch_db_updater.erl @@ -20,6 +20,18 @@ -include_lib("couch/include/couch_db.hrl"). +-record(comp_header, { + db_header, + meta_state +}). + +-record(merge_st, { + id_tree, + seq_tree, + curr, + rem_seqs, + infos +}). init({DbName, Filepath, Fd, Options}) -> case lists:member(create, Options) of @@ -29,7 +41,9 @@ init({DbName, Filepath, Fd, Options}) -> ok = couch_file:write_header(Fd, Header), % delete any old compaction files that might be hanging around RootDir = couch_config:get("couchdb", "database_dir", "."), - couch_file:delete(RootDir, Filepath ++ ".compact"); + couch_file:delete(RootDir, Filepath ++ ".compact"), + couch_file:delete(RootDir, Filepath ++ ".compact.data"), + couch_file:delete(RootDir, Filepath ++ ".compact.meta"); false -> case couch_file:read_header(Fd) of {ok, Header} -> @@ -39,7 +53,9 @@ init({DbName, Filepath, Fd, Options}) -> Header = #db_header{}, ok = couch_file:write_header(Fd, Header), % delete any old compaction files that might be hanging around - file:delete(Filepath ++ ".compact") + file:delete(Filepath ++ ".compact"), + file:delete(Filepath ++ ".compact.data"), + file:delete(Filepath ++ ".compact.meta") end end, Db = init_db(DbName, Filepath, Fd, Header, Options), @@ -211,9 +227,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), + ok = file:rename(CompactFilepath, Filepath ++ ".compact"), RootDir = couch_config:get("couchdb", "database_dir", "."), couch_file:delete(RootDir, Filepath), - ok = file:rename(CompactFilepath, Filepath), + ok = file:rename(Filepath ++ ".compact", Filepath), + % Delete the old meta compaction file after promoting + % the compaction file. + couch_file:delete(RootDir, Filepath ++ ".compact.meta"), close_db(Db), NewDb3 = refresh_validate_doc_funs(NewDb2), ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity), @@ -773,10 +793,6 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) -> {ok, Db#db{local_tree = Btree2}}. - -commit_data(Db) -> - commit_data(Db, false). - db_to_header(Db, Header) -> Header#db_header{ update_seq = Db#db.update_seq, @@ -786,40 +802,49 @@ db_to_header(Db, Header) -> security_ptr = Db#db.security_ptr, revs_limit = Db#db.revs_limit}. +commit_data(Db) -> + commit_data(Db, false). + commit_data(#db{waiting_delayed_commit=nil} = Db, true) -> - Db#db{waiting_delayed_commit=erlang:send_after(1000,self(),delayed_commit)}; + TRef = erlang:send_after(1000,self(),delayed_commit), + Db#db{waiting_delayed_commit=TRef}; commit_data(Db, true) -> Db; commit_data(Db, _) -> #db{ - fd = Fd, - filepath = Filepath, header = OldHeader, - fsync_options = FsyncOptions, waiting_delayed_commit = Timer } = Db, if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end, case db_to_header(Db, OldHeader) of - OldHeader -> - Db#db{waiting_delayed_commit=nil}; - Header -> - case lists:member(before_header, FsyncOptions) of - true -> ok = couch_file:sync(Filepath); - _ -> ok - end, + OldHeader -> Db#db{waiting_delayed_commit=nil}; + NewHeader -> sync_header(Db, NewHeader) + end. - ok = couch_file:write_header(Fd, Header), +sync_header(Db, NewHeader) -> + #db{ + fd = Fd, + filepath = FilePath, + fsync_options = FsyncOptions, + waiting_delayed_commit = Timer + } = Db, - case lists:member(after_header, FsyncOptions) of - true -> ok = couch_file:sync(Filepath); - _ -> ok - end, + if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end, - Db#db{waiting_delayed_commit=nil, - header=Header, - committed_update_seq=Db#db.update_seq} - end. + Before = lists:member(before_header, FsyncOptions), + After = lists:member(after_header, FsyncOptions), + if Before -> couch_file:sync(FilePath); true -> ok end, + ok = couch_file:write_header(Fd, NewHeader), + if After -> couch_file:sync(FilePath); true -> ok end, + + Db2 = Db#db{ + header=NewHeader, + committed_update_seq=Db#db.update_seq, + waiting_delayed_commit=nil + }, + tally:update(Db2), + Db2. copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) -> {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp), @@ -899,29 +924,32 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> NewInfos = stem_full_doc_infos(Db, NewInfos1), RemoveSeqs = case Retry of - false -> + nil -> []; - true -> - % We are retrying a compaction, meaning the documents we are copying may - % already exist in our file and must be removed from the by_seq index. + OldDocIdTree -> + % Compaction is being rerun to catch up to writes during the + % first pass. This means we may have docs that already exist + % in the seq_tree in the .data file. Here we lookup any old + % update_seqs so that they can be removed. Ids = [Id || #full_doc_info{id=Id} <- NewInfos], - Existing = couch_btree:lookup(NewDb#db.id_tree, Ids), + Existing = couch_btree:lookup(OldDocIdTree, Ids), [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] end, {ok, SeqTree} = couch_btree:add_remove( NewDb#db.seq_tree, NewInfos, RemoveSeqs), - {ok, IdTree} = couch_btree:add_remove( - NewDb#db.id_tree, NewInfos, []), - update_compact_task(length(NewInfos)), - NewDb#db{id_tree=IdTree, seq_tree=SeqTree}. + FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) -> + {{Id, Seq}, FDI} + end, NewInfos), + {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs), + update_compact_task(length(NewInfos)), + NewDb#db{id_tree=IdEms, seq_tree=SeqTree}. copy_compact(Db, NewDb0, Retry) -> - FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header], Compression = couch_compress:get_compression_method(), - NewDb = NewDb0#db{fsync_options=FsyncOptions, compression=Compression}, + NewDb = NewDb0#db{compression=Compression}, TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), BufferSize = list_to_integer( couch_config:get("database_compaction", "doc_buffer_size", "524288")), @@ -944,7 +972,8 @@ copy_compact(Db, NewDb0, Retry) -> Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2, if AccCopiedSize2 >= CheckpointAfter -> - {ok, {commit_data(NewDb2#db{update_seq = Seq}), [], 0, 0}}; + CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}), + {ok, {CommNewDb2, [], 0, 0}}; true -> {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}} end; @@ -961,7 +990,7 @@ copy_compact(Db, NewDb0, Retry) -> {changes_done, 0}, {total_changes, TotalChanges} ], - case Retry and couch_task_status:is_task_added() of + case (Retry =/= nil) and couch_task_status:is_task_added() of true -> couch_task_status:update([ {retry, true}, @@ -991,39 +1020,226 @@ copy_compact(Db, NewDb0, Retry) -> NewDb4 = NewDb3 end, - commit_data(NewDb4#db{update_seq=Db#db.update_seq}). + commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}). + -start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) -> - CompactFile = Filepath ++ ".compact", +start_copy_compact(#db{}=Db) -> + #db{name=Name, filepath=Filepath, options=Options} = Db, ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), - case couch_file:open(CompactFile) of - {ok, Fd} -> - Retry = true, - case couch_file:read_header(Fd) of - {ok, Header} -> - ok; - no_valid_header -> - ok = couch_file:write_header(Fd, Header=#db_header{}) - end; - {error, enoent} -> - {ok, Fd} = couch_file:open(CompactFile, [create]), - Retry = false, - ok = couch_file:write_header(Fd, Header=#db_header{}) - end, - NewDb = init_db(Name, CompactFile, Fd, Header, Db#db.options), - NewDb2 = if PurgeSeq > 0 -> - {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), - {ok, Pointer, _} = couch_file:append_term( - Fd, PurgedIdsRevs, [{compression, NewDb#db.compression}]), - NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}}; + + {ok, NewDb, DName, DFd, MFd, Retry} = + open_compaction_files(Name, Filepath, Options), + erlang:monitor(process, MFd), + + % This is a bit worrisome. init_db/4 will monitor the data fd + % but it doesn't know about the meta fd. For now I'll maintain + % that the data fd is the old normal fd and meta fd is special + % and hope everything works out for the best. + unlink(DFd), + + NewDb1 = copy_purge_info(Db, NewDb), + NewDb2 = copy_compact(Db, NewDb1, Retry), + NewDb3 = sort_meta_data(NewDb2), + NewDb4 = commit_compaction_data(NewDb3), + NewDb5 = copy_meta_data(NewDb4), + NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)), + close_db(NewDb6), + + ok = couch_file:close(MFd), + gen_server:cast(Db#db.main_pid, {compact_done, DName}). + + +open_compaction_files(DbName, DbFilePath, Options) -> + DataFile = DbFilePath ++ ".compact.data", + MetaFile = DbFilePath ++ ".compact.meta", + {ok, DataFd, DataHdr} = open_compaction_file(DataFile), + {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile), + case {DataHdr, MetaHdr} of + {#comp_header{}=A, #comp_header{}=A} -> + DbHeader = A#comp_header.db_header, + Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options), + Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state), + {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree}; + {#db_header{}, _} -> + ok = reset_compaction_file(MetaFd, #db_header{}), + Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options), + Db1 = bind_emsort(Db0, MetaFd, nil), + {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree}; + _ -> + Header = #db_header{}, + ok = reset_compaction_file(DataFd, Header), + ok = reset_compaction_file(MetaFd, Header), + Db0 = init_db(DbName, DataFile, DataFd, Header, Options), + Db1 = bind_emsort(Db0, MetaFd, nil), + {ok, Db1, DataFile, DataFd, MetaFd, nil} + end. + + +open_compaction_file(FilePath) -> + case couch_file:open(FilePath) of + {ok, Fd} -> + case couch_file:read_header(Fd) of + {ok, Header} -> {ok, Fd, Header}; + no_valid_header -> {ok, Fd, nil} + end; + {error, enoent} -> + {ok, Fd} = couch_file:open(FilePath, [create]), + {ok, Fd, nil} + end. + + +reset_compaction_file(Fd, Header) -> + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, Header). + + +copy_purge_info(OldDb, NewDb) -> + OldHdr = OldDb#db.header, + NewHdr = NewDb#db.header, + if OldHdr#db_header.purge_seq > 0 -> + {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb), + Opts = [{compression, NewDb#db.compression}], + {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts), + NewDb#db{ + header=NewHdr#db_header{ + purge_seq=OldHdr#db_header.purge_seq, + purged_docs=Ptr + } + }; true -> NewDb - end, - unlink(Fd), + end. + + +commit_compaction_data(#db{}=Db) -> + % Compaction needs to write headers to both the data file + % and the meta file so if we need to restart we can pick + % back up from where we left off. + commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)), + commit_compaction_data(Db, Db#db.fd). + + +commit_compaction_data(#db{header=OldHeader}=Db0, Fd) -> + % Mostly copied from commit_data/2 but I have to + % replace the logic to commit and fsync to a specific + % fd instead of the Filepath stuff that commit_data/2 + % does. + DataState = OldHeader#db_header.id_tree_state, + MetaFd = couch_emsort:get_fd(Db0#db.id_tree), + MetaState = couch_emsort:get_state(Db0#db.id_tree), + Db1 = bind_id_tree(Db0, Db0#db.fd, DataState), + Header = db_to_header(Db1, OldHeader), + CompHeader = #comp_header{ + db_header = Header, + meta_state = MetaState + }, + ok = couch_file:sync(Fd), + ok = couch_file:write_header(Fd, CompHeader), + Db2 = Db1#db{ + waiting_delayed_commit=nil, + header=Header, + committed_update_seq=Db1#db.update_seq + }, + bind_emsort(Db2, MetaFd, MetaState). + + +bind_emsort(Db, Fd, nil) -> + {ok, Ems} = couch_emsort:open(Fd), + Db#db{id_tree=Ems}; +bind_emsort(Db, Fd, State) -> + {ok, Ems} = couch_emsort:open(Fd, [{root, State}]), + Db#db{id_tree=Ems}. + + +bind_id_tree(Db, Fd, State) -> + {ok, IdBtree} = couch_btree:open(State, Fd, [ + {split, fun ?MODULE:btree_by_id_split/1}, + {join, fun ?MODULE:btree_by_id_join/2}, + {reduce, fun ?MODULE:btree_by_id_reduce/2} + ]), + Db#db{id_tree=IdBtree}. + + +sort_meta_data(Db0) -> + {ok, Ems} = couch_emsort:merge(Db0#db.id_tree), + Db0#db{id_tree=Ems}. + + +copy_meta_data(#db{fd=Fd, header=Header}=Db) -> + Src = Db#db.id_tree, + DstState = Header#db_header.id_tree_state, + {ok, IdTree0} = couch_btree:open(DstState, Fd, [ + {split, fun ?MODULE:btree_by_id_split/1}, + {join, fun ?MODULE:btree_by_id_join/2}, + {reduce, fun ?MODULE:btree_by_id_reduce/2} + ]), + {ok, Iter} = couch_emsort:iter(Src), + Acc0 = #merge_st{ + id_tree=IdTree0, + seq_tree=Db#db.seq_tree, + rem_seqs=[], + infos=[] + }, + Acc = merge_docids(Iter, Acc0), + {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos), + {ok, SeqTree} = couch_btree:add_remove( + Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs + ), + Db#db{id_tree=IdTree, seq_tree=SeqTree}. + + +merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 -> + #merge_st{ + id_tree=IdTree0, + seq_tree=SeqTree0, + rem_seqs=RemSeqs + } = Acc, + {ok, IdTree1} = couch_btree:add(IdTree0, Infos), + {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs), + Acc1 = Acc#merge_st{ + id_tree=IdTree1, + seq_tree=SeqTree1, + rem_seqs=[], + infos=[] + }, + merge_docids(Iter, Acc1); +merge_docids(Iter, #merge_st{curr=Curr}=Acc) -> + case next_info(Iter, Curr, []) of + {NextIter, NewCurr, FDI, Seqs} -> + Acc1 = Acc#merge_st{ + infos = [FDI | Acc#merge_st.infos], + rem_seqs = Seqs ++ Acc#merge_st.rem_seqs, + curr = NewCurr + }, + merge_docids(NextIter, Acc1); + {finished, FDI, Seqs} -> + Acc#merge_st{ + infos = [FDI | Acc#merge_st.infos], + rem_seqs = Seqs ++ Acc#merge_st.rem_seqs, + curr = undefined + }; + empty -> + Acc + end. + + +next_info(Iter, undefined, []) -> + case couch_emsort:next(Iter) of + {ok, {{Id, Seq}, FDI}, NextIter} -> + next_info(NextIter, {Id, Seq, FDI}, []); + finished -> + empty + end; +next_info(Iter, {Id, Seq, FDI}, Seqs) -> + case couch_emsort:next(Iter) of + {ok, {{Id, NSeq}, NFDI}, NextIter} -> + next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]); + {ok, {{NId, NSeq}, NFDI}, NextIter} -> + {NextIter, {NId, NSeq, NFDI}, FDI, Seqs}; + finished -> + {finished, FDI, Seqs} + end. - NewDb3 = copy_compact(Db, NewDb2, Retry), - close_db(NewDb3), - gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}). update_compact_task(NumChanges) -> [Changes, Total] = couch_task_status:get([changes_done, total_changes]), @@ -1036,6 +1252,7 @@ update_compact_task(NumChanges) -> end, couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]). + make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) -> Body = case couch_compress:is_compressed(Body0, Comp) of true -> http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/5d3753d0/src/couch_emsort.erl ---------------------------------------------------------------------- diff --git a/src/couch_emsort.erl b/src/couch_emsort.erl new file mode 100644 index 0000000..2a25a23 --- /dev/null +++ b/src/couch_emsort.erl @@ -0,0 +1,318 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_emsort). + +% This is an implementation of an external N-way merge sort. It's primary +% purpose is to be used during database compaction as an optimization for +% managing the docid btree. +% +% Trunk currently writes the docid btree as its compacting the database but +% this is quite inneficient as its written out of order in the general case +% as writes are ordered by update_seq. +% +% The general design of this module is a very standard merge sort with one +% caveat due to append only files. This is described in more detail in the +% sorting phase. +% +% The basic algorithm is in two halves. The first half stores KV pairs to disk +% which is then followed by the actual sorting phase that streams KV's back +% to the client using a fold-like function. After some basic definitions we'll +% describe both phases. +% +% Key/Value apairs (aka, KV pairs, or KVs) are simply lists of two-tuples with +% a key as the first element and an arbitrary value as the second. The key of +% this pair is what used to determine the sort order based on native Erlang +% term comparison. +% +% Internally, KVs are stored as lists with a max size defined by +% #ems.chain_chunk. These lists are then chained together on disk using disk +% offsets as a poor man's linked list. The basic format of a list looks like +% {KVs, DiskOffset} where DiskOffset is either the atom nil which means "end +% of the list" or an integer that is a file position offset that is the +% location of another {KVs, DiskOffset} term. The head of each list is +% referred to with a single DiskOffset. The set of terms that extend from +% this initial DiskOffset to the last {KVs, nil} term is referred to in the +% code as a chain. Two important facts are that one call to couch_emsort:add/2 +% creates a single chain, and that a chain is always sorted on disk (though its +% possible to be sorted in descending order which will be discussed later). +% +% The second major internal structure is the back bone. This is a list of +% chains that has a quite similar structure to chains but contains different +% data types and has no guarantee on ordering. The back bone is merely the +% list of all head DiskOffsets. The structure has the similar structure of +% {DiskOffsets, DiskOffset} that we use for chains, except that DiskOffsets is +% a list of integers that refer to the heads of chains. The maximum size of +% DiskOffsets is defined by #ems.bb_chunk. It is important to note that the +% backbone has no defined ordering. The other thing of note is that the RAM +% bounds are loosely defined as: +% +% #ems.bb_chunk * #ems.chain_chunk * avg_size(KV). +% +% Build Phase +% ----------- +% +% As mentioned, each call to couch_emsort:add/2 creates a chain from the +% list of KVs that are passed in. This list is first sorted and then the +% chain is created by foldr-ing (note: r) across the list to build the +% chain on disk. It is important to note that the final chain is then +% sorted in ascending order on disk. +% +% +% Sort Phase +% ---------- +% +% The sort phase is where the merge sort kicks in. This is generally your +% average merge sort with a caveat for append only storage. First the +% general outline. +% +% The general outline for this sort is that it iteratively merges chains +% in the backbone until less than #ems.bb_chunk chains exist. At this +% point it switches to the last merge sort phase where it just streams +% the sorted KVs back to the client using a fold function. +% +% The general chain merging is a pretty standard merge sort. You load up +% the initial KVs from each phase, pick the next one in sort order and +% then when you run out of KVs you're left with a single DiskOffset for +% the head of a single chain that represents the merge. These new +% DiskOffsets are used to build the new back bone. +% +% The one caveat here is that we're using append only storage. This is +% important because once we make a pass we've effectively reversed the +% sort order of each chain. Ie, the first merge results in chains that +% are ordered in descending order. Since, one pass reverses the list +% the trick is that each phase does two passes. The first phase picks +% the smallest KV to write next and the second phase picks the largest. +% In this manner each time we do a back bone merge we end up with chains +% that are always sorted in an ascending order. +% +% The one downfall is that in the interest of simplicity the sorting is +% restricted to Erlang's native term sorting. A possible extension would +% be to allow two comparison functions to be used, but this module is +% currently only used for docid sorting which is hardcoded to be raw +% Erlang ordering. +% +% Diagram +% ------- +% +% If it helps, this is a general diagram of the internal structures. A +% couple points to note since this is ASCII art. The BB pointers across +% the top are lists of chains going down. Each BBN item is one of the +% {DiskOffsets, DiskOffset} structures discussed earlier. Going down, +% the CMN nodes are actually representing #ems.bb_chunk chains in parallel +% going off the back bone. It is important and not represented in this +% diagram that within these groups the chains don't have to be the same +% length. That's just a limitiationg of my ASCII artistic abilities. +% +% The BBN* node is marked with a * to denote that it is the only state +% that we store when writing headeres to disk as it has pointers that +% lead us to all data in the tree. +% +% BB1 <- BB2 <- BB3 <- BBN* +% | | | | +% v v v v +% CA1 CB1 CC1 CD1 +% | | | +% v v v +% CA2 CC2 CD2 +% | | +% v v +% CA3 CD3 +% + +-export([open/1, open/2, get_fd/1, get_state/1]). +-export([add/2, merge/1, sort/1, iter/1, next/1]). + + +-record(ems, { + fd, + root, + bb_chunk = 10, + chain_chunk = 100 +}). + + +open(Fd) -> + {ok, #ems{fd=Fd}}. + + +open(Fd, Options) -> + {ok, set_options(#ems{fd=Fd}, Options)}. + + +set_options(Ems, []) -> + Ems; +set_options(Ems, [{root, Root} | Rest]) -> + set_options(Ems#ems{root=Root}, Rest); +set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) -> + set_options(Ems#ems{chain_chunk=Count}, Rest); +set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) -> + set_options(Ems#ems{bb_chunk=Count}, Rest). + + +get_fd(#ems{fd=Fd}) -> + Fd. + + +get_state(#ems{root=Root}) -> + Root. + + +add(Ems, []) -> + {ok, Ems}; +add(Ems, KVs) -> + Pos = write_kvs(Ems, KVs), + {ok, add_bb_pos(Ems, Pos)}. + + +sort(#ems{}=Ems) -> + {ok, Ems1} = merge(Ems), + iter(Ems1). + + +merge(#ems{root=undefined}=Ems) -> + {ok, Ems}; +merge(#ems{}=Ems) -> + {ok, decimate(Ems)}. + + +iter(#ems{root=undefined}=Ems) -> + {ok, {Ems, []}}; +iter(#ems{root={BB, nil}}=Ems) -> + Chains = init_chains(Ems, small, BB), + {ok, {Ems, Chains}}; +iter(#ems{root={_, _}}) -> + {error, not_merged}. + + +next({_Ems, []}) -> + finished; +next({Ems, Chains}) -> + {KV, RestChains} = choose_kv(small, Ems, Chains), + {ok, KV, {Ems, RestChains}}. + + +add_bb_pos(#ems{root=undefined}=Ems, Pos) -> + Ems#ems{root={[Pos], nil}}; +add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) -> + {NewBB, NewPrev} = append_item(Ems, {BB, Prev}, Pos, Ems#ems.bb_chunk), + Ems#ems{root={NewBB, NewPrev}}. + + +write_kvs(Ems, KVs) -> + % Write the list of KV's to disk in sorted order in chunks + % of 100. Also make sure that the order is so that they + % can be streamed in asscending order. + {LastKVs, LastPos} = + lists:foldr(fun(KV, Acc) -> + append_item(Ems, Acc, KV, Ems#ems.chain_chunk) + end, {[], nil}, lists:sort(KVs)), + {ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}), + Final. + + +decimate(#ems{root={_BB, nil}}=Ems) -> + % We have less than bb_chunk backbone pointers so we're + % good to start streaming KV's back to the client. + Ems; +decimate(#ems{root={BB, NextBB}}=Ems) -> + % To make sure we have a bounded amount of data in RAM + % at any given point we first need to decimate the data + % by performing the first couple iterations of a merge + % sort writing the intermediate results back to disk. + + % The first pass gives us a sort with pointers linked from + % largest to smallest. + {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB), + + % We have to run a second pass so that links are pointed + % back from smallest to largest. + {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB), + + % Continue deicmating until we have an acceptable bound on + % the number of keys to use. + decimate(Ems#ems{root={FwdBB, FwdNextBB}}). + + +merge_back_bone(Ems, Choose, BB, NextBB) -> + BBPos = merge_chains(Ems, Choose, BB), + merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}). + + +merge_rest_back_bone(_Ems, _Choose, nil, Acc) -> + Acc; +merge_rest_back_bone(Ems, Choose, BBPos, Acc) -> + {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos), + NewPos = merge_chains(Ems, Choose, BB), + {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk), + merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}). + + +merge_chains(Ems, Choose, BB) -> + Chains = init_chains(Ems, Choose, BB), + merge_chains(Ems, Choose, Chains, {[], nil}). + + +merge_chains(Ems, _Choose, [], ChainAcc) -> + {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc), + CPos; +merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) -> + {KV, RestChains} = choose_kv(Choose, Ems, Chains), + {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC), + merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}). + + +init_chains(Ems, Choose, BB) -> + Chains = lists:map(fun(CPos) -> + {ok, {KVs, NextKVs}} = couch_file:pread_term(Ems#ems.fd, CPos), + {KVs, NextKVs} + end, BB), + order_chains(Choose, Chains). + + +order_chains(small, Chains) -> lists:sort(Chains); +order_chains(big, Chains) -> lists:reverse(lists:sort(Chains)). + + +choose_kv(_Choose, _Ems, [{[KV], nil} | Rest]) -> + {KV, Rest}; +choose_kv(Choose, Ems, [{[KV], Pos} | RestChains]) -> + {ok, Chain} = couch_file:pread_term(Ems#ems.fd, Pos), + case Choose of + small -> {KV, ins_small_chain(RestChains, Chain, [])}; + big -> {KV, ins_big_chain(RestChains, Chain, [])} + end; +choose_kv(Choose, _Ems, [{[KV | RestKVs], Prev} | RestChains]) -> + case Choose of + small -> {KV, ins_small_chain(RestChains, {RestKVs, Prev}, [])}; + big -> {KV, ins_big_chain(RestChains, {RestKVs, Prev}, [])} + end. + + +ins_small_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1<K2 -> + ins_small_chain(Rest, C2, [C1 | Acc]); +ins_small_chain(Rest, Chain, Acc) -> + lists:reverse(Acc, [Chain | Rest]). + + +ins_big_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1>K2 -> + ins_big_chain(Rest, C2, [C1 | Acc]); +ins_big_chain(Rest, Chain, Acc) -> + lists:reverse(Acc, [Chain | Rest]). + + +append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size -> + {ok, PrevList, _} = couch_file:append_term(Ems#ems.fd, {List, Prev}), + {[Pos], PrevList}; +append_item(_Ems, {List, Prev}, Pos, _Size) -> + {[Pos | List], Prev}. + http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/5d3753d0/src/couch_server.erl ---------------------------------------------------------------------- diff --git a/src/couch_server.erl b/src/couch_server.erl index 4e0fe01..40dc3f9 100644 --- a/src/couch_server.erl +++ b/src/couch_server.erl @@ -397,8 +397,12 @@ handle_call({delete, DbName, _Options}, _From, Server) -> db_closed(Server, Db#db.options) end, - %% Delete any leftover .compact files. If we don't do this a subsequent - %% request for this DB will try to open the .compact file and use it. + %% Delete any leftover compaction files. If we don't do this a + %% subsequent request for this DB will try to open them to use + %% as a recovery. + lists:foreach(fun(Ext) -> + couch_file:delete(Server#server.root_dir, FullFilepath ++ Ext) + end, [".compact", ".compact.data", ".compact.meta"]), couch_file:delete(Server#server.root_dir, FullFilepath ++ ".compact"), case couch_file:delete(Server#server.root_dir, FullFilepath) of
