This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/master by this push: new fe53e43 Prepare to fabric attachment receiver from a fun closure to a tuple fe53e43 is described below commit fe53e437ca5ec9d23aa1b55d7934daced157a9e3 Author: Nick Vatamaniuc <vatam...@apache.org> AuthorDate: Tue Jun 19 15:05:00 2018 -0400 Prepare to fabric attachment receiver from a fun closure to a tuple Passing closures around is fragile and prevents smooth upgrading. Instead pass a tuple with a data from the receiver closure explicitly and convert to back to a local fun locally on each node. This is a preparatory commit before the switch. To ensure attachment uploading requests are successful, would need to first install this change on all the nodes. Then in a separate upgrade step, switch fabric.erl to call fabric_doc_atts:receiver instead fabric_doc_attatchments:recevier. --- src/fabric/src/fabric_doc_atts.erl | 168 +++++++++++++++++++++++++++++++++++++ src/fabric/src/fabric_rpc.erl | 2 + 2 files changed, 170 insertions(+) diff --git a/src/fabric/src/fabric_doc_atts.erl b/src/fabric/src/fabric_doc_atts.erl new file mode 100644 index 0000000..7ef5dd8 --- /dev/null +++ b/src/fabric/src/fabric_doc_atts.erl @@ -0,0 +1,168 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_atts). + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-export([ + receiver/2, + receiver_callback/2 +]). + + +receiver(_Req, undefined) -> + <<"">>; +receiver(_Req, {unknown_transfer_encoding, Unknown}) -> + exit({unknown_transfer_encoding, Unknown}); +receiver(Req, chunked) -> + MiddleMan = spawn(fun() -> middleman(Req, chunked) end), + {fabric_attachment_receiver, MiddleMan, chunked}; +receiver(_Req, 0) -> + <<"">>; +receiver(Req, Length) when is_integer(Length) -> + maybe_send_continue(Req), + Middleman = spawn(fun() -> middleman(Req, Length) end), + {fabric_attachment_receiver, Middleman, Length}; +receiver(_Req, Length) -> + exit({length_not_integer, Length}). + + +receiver_callback(Middleman, chunked) -> + fun(4096, ChunkFun, State) -> + write_chunks(Middleman, ChunkFun, State) + end; +receiver_callback(Middleman, Length) when is_integer(Length) -> + fun() -> + Middleman ! {self(), gimme_data}, + Timeout = fabric_util:attachments_timeout(), + receive + {Middleman, Data} -> + rexi:reply(attachment_chunk_received), + Data + after Timeout -> + exit(timeout) + end + end. + + +%% +%% internal +%% + +maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) -> + case couch_httpd:header_value(Req, "expect") of + undefined -> + ok; + Expect -> + case string:to_lower(Expect) of + "100-continue" -> + MochiReq:start_raw_response({100, gb_trees:empty()}); + _ -> + ok + end + end. + +write_chunks(MiddleMan, ChunkFun, State) -> + MiddleMan ! {self(), gimme_data}, + Timeout = fabric_util:attachments_timeout(), + receive + {MiddleMan, ChunkRecordList} -> + rexi:reply(attachment_chunk_received), + case flush_chunks(ChunkRecordList, ChunkFun, State) of + {continue, NewState} -> + write_chunks(MiddleMan, ChunkFun, NewState); + {done, NewState} -> + NewState + end + after Timeout -> + exit(timeout) + end. + +flush_chunks([], _ChunkFun, State) -> + {continue, State}; +flush_chunks([{0, _}], _ChunkFun, State) -> + {done, State}; +flush_chunks([Chunk | Rest], ChunkFun, State) -> + NewState = ChunkFun(Chunk, State), + flush_chunks(Rest, ChunkFun, NewState). + +receive_unchunked_attachment(_Req, 0) -> + ok; +receive_unchunked_attachment(Req, Length) -> + receive {MiddleMan, go} -> + Data = couch_httpd:recv(Req, 0), + MiddleMan ! {self(), Data} + end, + receive_unchunked_attachment(Req, Length - size(Data)). + +middleman(Req, chunked) -> + % spawn a process to actually receive the uploaded data + RcvFun = fun(ChunkRecord, ok) -> + receive {From, go} -> From ! {self(), ChunkRecord} end, ok + end, + Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end), + + % take requests from the DB writers and get data from the receiver + N = erlang:list_to_integer(config:get("cluster","n")), + Timeout = fabric_util:attachments_timeout(), + middleman_loop(Receiver, N, [], [], Timeout); + +middleman(Req, Length) -> + Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end), + N = erlang:list_to_integer(config:get("cluster","n")), + Timeout = fabric_util:attachments_timeout(), + middleman_loop(Receiver, N, [], [], Timeout). + +middleman_loop(Receiver, N, Counters0, ChunkList0, Timeout) -> + receive {From, gimme_data} -> + % Figure out how far along this writer (From) is in the list + ListIndex = case fabric_dict:lookup_element(From, Counters0) of + undefined -> 0; + I -> I + end, + + % Talk to the receiver to get another chunk if necessary + ChunkList1 = if ListIndex == length(ChunkList0) -> + Receiver ! {self(), go}, + receive + {Receiver, ChunkRecord} -> + ChunkList0 ++ [ChunkRecord] + end; + true -> ChunkList0 end, + + % reply to the writer + Reply = lists:nthtail(ListIndex, ChunkList1), + From ! {self(), Reply}, + + % Update the counter for this writer + Counters1 = fabric_dict:update_counter(From, length(Reply), Counters0), + + % Drop any chunks that have been sent to all writers + Size = fabric_dict:size(Counters1), + NumToDrop = lists:min([I || {_, I} <- Counters1]), + + {ChunkList3, Counters3} = + if Size == N andalso NumToDrop > 0 -> + ChunkList2 = lists:nthtail(NumToDrop, ChunkList1), + Counters2 = [{F, I-NumToDrop} || {F, I} <- Counters1], + {ChunkList2, Counters2}; + true -> + {ChunkList1, Counters1} + end, + + middleman_loop(Receiver, N, Counters3, ChunkList3, Timeout) + after Timeout -> + exit(Receiver, kill), + ok + end. diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 913aafe0..60526f4 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -440,6 +440,8 @@ make_att_reader({follows, Parser, Ref}) -> throw({mp_parser_died, Reason}) end end; +make_att_reader({fabric_attachment_receiver, Middleman, Length}) -> + fabric_doc_atts:receiver_callback(Middleman, Length); make_att_reader(Else) -> Else.