Updated Branches: refs/heads/master af7441d8d -> 093d2aa65
add Server-Sent Events protocol to db changes API. close #COUCHDB-986 This patch add support for the new specification of w3c by adding a new feed type named `eventsource`: http://www.w3.org/TR/2009/WD-eventsource-20090423/ This patch is based on @indutny patch with edits. Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/093d2aa6 Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/093d2aa6 Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/093d2aa6 Branch: refs/heads/master Commit: 093d2aa6544546a95f6133f1db3c4f4179793f3c Parents: af7441d Author: benoitc <[email protected]> Authored: Wed May 16 07:30:19 2012 +0200 Committer: benoitc <[email protected]> Committed: Wed May 16 07:30:19 2012 +0200 ---------------------------------------------------------------------- share/www/script/test/changes.js | 28 ++++++++++++++++++++++++++++ src/couchdb/couch_changes.erl | 15 ++++++++++----- src/couchdb/couch_httpd_db.erl | 24 ++++++++++++++++++++++-- 3 files changed, 60 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/093d2aa6/share/www/script/test/changes.js ---------------------------------------------------------------------- diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js index 19e22fd..c529b21 100644 --- a/share/www/script/test/changes.js +++ b/share/www/script/test/changes.js @@ -139,6 +139,34 @@ couchTests.changes = function(debug) { // otherwise we'll continue to receive heartbeats forever xhr.abort(); + // test Server Sent Event (eventsource) + if (window.EventSource) { + var source = new EventSource( + "/test_suite_db/_changes?feed=eventsource"); + var results = []; + var sourceListener = function(e) { + var data = JSON.parse(e.data); + results.push(data); + + }; + + source.addEventListener('message', sourceListener , false); + + waitForSuccess(function() { + if (results.length != 3) + throw "bad seq, try again"; + }); + + source.removeEventListener('message', sourceListener, false); + + T(results[0].seq == 1); + T(results[0].id == "foo"); + + T(results[1].seq == 2); + T(results[1].id == "bar"); + T(results[1].changes[0].rev == docBar._rev); + } + // test longpolling xhr = CouchDB.newXhr(); http://git-wip-us.apache.org/repos/asf/couchdb/blob/093d2aa6/src/couchdb/couch_changes.erl ---------------------------------------------------------------------- diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index aec7873..85c9e54 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -63,7 +63,8 @@ handle_changes(Args1, Req, Db0) -> put(last_changes_heartbeat, now()) end, - if Feed == "continuous" orelse Feed == "longpoll" -> + case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of + true -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), Self = self(), @@ -89,7 +90,7 @@ handle_changes(Args1, Req, Db0) -> get_rest_db_updated(ok) % clean out any remaining update messages end end; - true -> + false -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), @@ -261,7 +262,9 @@ get_changes_timeout(Args, Callback) -> fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} end. -start_sending_changes(_Callback, UserAcc, "continuous") -> +start_sending_changes(_Callback, UserAcc, ResponseType) + when ResponseType =:= "continuous" + orelse ResponseType =:= "eventsource" -> UserAcc; start_sending_changes(Callback, UserAcc, ResponseType) -> Callback(start, ResponseType, UserAcc). @@ -434,7 +437,9 @@ keep_sending_changes(Args, Acc0, FirstRound) -> end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType, UserAcc). -changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) -> +changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc) + when ResponseType =:= "continuous" + orelse ResponseType =:= "eventsource" -> #changes_acc{ filter = FilterFun, callback = Callback, user_acc = UserAcc, limit = Limit, db = Db, @@ -456,7 +461,7 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) -> end; _ -> ChangesRow = changes_row(Results, DocInfo, Acc), - UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc), + UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), reset_heartbeat(), {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}} end; http://git-wip-us.apache.org/repos/asf/couchdb/blob/093d2aa6/src/couchdb/couch_httpd_db.erl ---------------------------------------------------------------------- diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index de39b9e..0920014 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -76,14 +76,23 @@ handle_changes_req1(Req, #db{name=DbName}=Db) -> handle_changes_req2(Req, Db) -> MakeCallback = fun(Resp) -> - fun({change, Change, _}, "continuous") -> + fun({change, {ChangeProp}=Change, _}, "eventsource") -> + Seq = proplists:get_value(<<"seq">>, ChangeProp), + send_chunk(Resp, ["data: ", ?JSON_ENCODE(Change), + "\n", "id: ", ?JSON_ENCODE(Seq), + "\n\n"]); + ({change, Change, _}, "continuous") -> send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); ({change, Change, Prepend}, _) -> send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); + (start, "eventsource") -> + ok; (start, "continuous") -> ok; (start, _) -> send_chunk(Resp, "{\"results\":[\n"); + ({stop, _EndSeq}, "eventsource") -> + end_json_response(Resp); ({stop, EndSeq}, "continuous") -> send_chunk( Resp, @@ -118,6 +127,15 @@ handle_changes_req2(Req, Db) -> end ) end; + "eventsource" -> + Headers = [ + {"Content-Type", "text/event-stream"}, + {"Cache-Control", "no-cache"} + ], + {ok, Resp} = couch_httpd:start_json_response(Req, 200, Headers), + fun(FeedChangesFun) -> + FeedChangesFun(MakeCallback(Resp)) + end; _ -> % "longpoll" or "continuous" {ok, Resp} = couch_httpd:start_json_response(Req, 200), @@ -1097,13 +1115,15 @@ parse_doc_query(Req) -> parse_changes_query(Req) -> lists:foldl(fun({Key, Value}, Args) -> - case {Key, Value} of + case {string:to_lower(Key), Value} of {"feed", _} -> Args#changes_args{feed=Value}; {"descending", "true"} -> Args#changes_args{dir=rev}; {"since", _} -> Args#changes_args{since=list_to_integer(Value)}; + {"last-event-id", _} -> + Args#changes_args{since=list_to_integer(Value)}; {"limit", _} -> Args#changes_args{limit=list_to_integer(Value)}; {"style", _} ->
