Use a record for changes_callback accumulator This change allows us to evolve the accumulator in a less-brittle way and sets the stage for new data to be held in the accumulator to address COUCHDB-2724.
In the course of this change I also switched the feed labels from strings to atoms (they're only used for pattern matching in the accumulator, and multiple matches are executed for every row in the feed, so it seemed silly to be using Erlang lists for that comparison), and I explicitly indicated when we start a chunked response instead of guessing it heuristically based on other contents in the accumulator. COUCHDB-2724 Project: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/commit/f45c8b2c Tree: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/tree/f45c8b2c Diff: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/diff/f45c8b2c Branch: refs/heads/master Commit: f45c8b2c4262c682000b5e2ac50d16c9cade3ec6 Parents: 87fa816 Author: Adam Kocoloski <[email protected]> Authored: Mon Jun 22 19:41:37 2015 -0400 Committer: Adam Kocoloski <[email protected]> Committed: Mon Jun 22 20:57:55 2015 -0400 ---------------------------------------------------------------------- src/chttpd_db.erl | 80 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/blob/f45c8b2c/src/chttpd_db.erl ---------------------------------------------------------------------- diff --git a/src/chttpd_db.erl b/src/chttpd_db.erl index 017f832..71c6cec 100644 --- a/src/chttpd_db.erl +++ b/src/chttpd_db.erl @@ -34,6 +34,15 @@ atts_since = nil }). +% Accumulator for changes_callback function +-record(cacc, { + etag, + feed, + mochi, + prepend = "", + responding = false +}). + -define(IS_ALL_DOCS(T), ( T == <<"_all_docs">> orelse T == <<"_local_docs">> @@ -82,13 +91,14 @@ handle_changes_req1(#httpd{}=Req, Db) -> DeltaT = timer:now_diff(os:timestamp(), T0) / 1000, couch_stats:update_histogram([couchdb, dbinfo], DeltaT), chttpd:etag_respond(Req, Etag, fun() -> - fabric:changes(Db, fun changes_callback/2, {"normal", {"Etag",Etag}, Req}, - ChangesArgs) + Acc0 = #cacc{feed = normal, etag = Etag, mochi = Req}, + fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs) end); Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" -> couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]), + Acc0 = #cacc{feed = list_to_atom(Feed), mochi = Req}, try - fabric:changes(Db, fun changes_callback/2, {Feed, Req}, ChangesArgs) + fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs) after couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes]) end; @@ -98,13 +108,15 @@ handle_changes_req1(#httpd{}=Req, Db) -> end. % callbacks for continuous feed (newline-delimited JSON Objects) -changes_callback(start, {"continuous", Req}) -> - {ok, Resp} = chttpd:start_delayed_json_response(Req, 200), - {ok, {"continuous", Resp}}; -changes_callback({change, Change}, {"continuous", Resp}) -> +changes_callback(start, #cacc{feed = continuous} = Acc) -> + {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200), + {ok, Acc#cacc{mochi = Resp, responding = true}}; +changes_callback({change, Change}, #cacc{feed = continuous} = Acc) -> + #cacc{mochi = Resp} = Acc, {ok, Resp1} = chttpd:send_delayed_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]), - {ok, {"continuous", Resp1}}; -changes_callback({stop, EndSeq0, Pending}, {"continuous", Resp}) -> + {ok, Acc#cacc{mochi = Resp1}}; +changes_callback({stop, EndSeq0, Pending}, #cacc{feed = continuous} = Acc) -> + #cacc{mochi = Resp} = Acc, EndSeq = case is_old_couch(Resp) of true -> 0; false -> EndSeq0 end, Row = {[ {<<"last_seq">>, EndSeq}, @@ -114,14 +126,16 @@ changes_callback({stop, EndSeq0, Pending}, {"continuous", Resp}) -> chttpd:end_delayed_json_response(Resp1); % callbacks for eventsource feed (newline-delimited eventsource Objects) -changes_callback(start, {"eventsource", Req}) -> +changes_callback(start, #cacc{feed = eventsource} = Acc) -> + #cacc{mochi = Req} = Acc, Headers = [ {"Content-Type", "text/event-stream"}, {"Cache-Control", "no-cache"} ], {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers), - {ok, {"eventsource", Resp}}; -changes_callback({change, {ChangeProp}=Change}, {"eventsource", Resp}) -> + {ok, Acc#cacc{mochi = Resp, responding = true}}; +changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) -> + #cacc{mochi = Resp} = Acc, Seq = proplists:get_value(seq, ChangeProp), Chunk = [ "data: ", ?JSON_ENCODE(Change), @@ -129,28 +143,34 @@ changes_callback({change, {ChangeProp}=Change}, {"eventsource", Resp}) -> "\n\n" ], {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk), - {ok, {"eventsource", Resp1}}; -changes_callback(timeout, {"eventsource", Resp}) -> + {ok, Acc#cacc{mochi = Resp1}}; +changes_callback(timeout, #cacc{feed = eventsource} = Acc) -> + #cacc{mochi = Resp} = Acc, Chunk = "event: heartbeat\ndata: \n\n", {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk), {ok, {"eventsource", Resp1}}; -changes_callback({stop, _EndSeq}, {"eventsource", Resp}) -> +changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) -> + Resp = Acc#cacc.mochi, chttpd:end_delayed_json_response(Resp); % callbacks for longpoll and normal (single JSON Object) -changes_callback(start, {"normal", {"Etag", Etag}, Req}) -> +changes_callback(start, #cacc{feed = normal} = Acc) -> + #cacc{etag = Etag, mochi = Req} = Acc, FirstChunk = "{\"results\":[\n", {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"Etag",Etag}], FirstChunk), - {ok, {"", Resp}}; -changes_callback(start, {_, Req}) -> + {ok, Acc#cacc{mochi = Resp, responding = true}}; +changes_callback(start, Acc) -> + #cacc{mochi = Req} = Acc, FirstChunk = "{\"results\":[\n", {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk), - {ok, {"", Resp}}; -changes_callback({change, Change}, {Prepend, Resp}) -> + {ok, Acc#cacc{mochi = Resp, responding = true}}; +changes_callback({change, Change}, Acc) -> + #cacc{prepend = Prepend, mochi = Resp} = Acc, {ok, Resp1} = chttpd:send_delayed_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]), - {ok, {",\r\n", Resp1}}; -changes_callback({stop, EndSeq, Pending}, {_, Resp}) -> + {ok, Acc#cacc{prepend = ",\r\n", mochi = Resp1}}; +changes_callback({stop, EndSeq, Pending}, Acc) -> + #cacc{mochi = Resp} = Acc, {ok, Resp1} = case is_old_couch(Resp) of true -> chttpd:send_delayed_chunk(Resp, "\n],\n\"last_seq\":0}\n"); @@ -165,15 +185,17 @@ changes_callback({stop, EndSeq, Pending}, {_, Resp}) -> end, chttpd:end_delayed_json_response(Resp1); -changes_callback(timeout, {Prepend, Resp}) -> - {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"), - {ok, {Prepend, Resp1}}; -changes_callback({error, Reason}, {_, #httpd{}=Req}) -> +changes_callback(timeout, Acc) -> + {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"), + {ok, Acc#cacc{mochi = Resp1}}; +changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) -> + #cacc{mochi = Req} = Acc, chttpd:send_error(Req, Reason); -changes_callback({error, Reason}, {"normal", {"Etag", _Etag}, Req}) -> +changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc) -> + #cacc{mochi = Req} = Acc, chttpd:send_error(Req, Reason); -changes_callback({error, Reason}, {_, Resp}) -> - chttpd:send_delayed_error(Resp, Reason). +changes_callback({error, Reason}, Acc) -> + chttpd:send_delayed_error(Acc#cacc.mochi, Reason). is_old_couch(Resp) -> MochiReq = chttpd:get_delayed_req(Resp),
