Author: fdmanana Date: Sun May 15 02:35:25 2011 New Revision: 1103266 URL: http://svn.apache.org/viewvc?rev=1103266&view=rev Log: Bug fix in couch_work_queue: state's size not decremented
When dequeing some items from the queue (that is, not taking all the queued items) the size field of the gen_server's state was not being decremented as it should. However when all the queue items are delivered to a consumer, the size is set to 0. This particular fix has a good impact in the new replicator because it uses work queues and the consumers only dequeue 1 item at a time - producers will no longer be blocked until the queue gets empty. For a push replication that used to take about 15 minutes, it now takes about 13 minutes. Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=1103266&r1=1103265&r2=1103266&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_work_queue.erl (original) +++ couchdb/trunk/src/couchdb/couch_work_queue.erl Sun May 15 02:35:25 2011 @@ -13,6 +13,8 @@ -module(couch_work_queue). -behaviour(gen_server). +-include("couch_db.hrl"). + % public API -export([new/1, queue/2, dequeue/1, dequeue/2, close/1]). @@ -37,8 +39,10 @@ new(Options) -> gen_server:start_link(couch_work_queue, Options, []). +queue(Wq, Item) when is_binary(Item) -> + gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity); queue(Wq, Item) -> - gen_server:call(Wq, {queue, Item}, infinity). + gen_server:call(Wq, {queue, Item, byte_size(?term_to_bin(Item))}, infinity). dequeue(Wq) -> @@ -70,10 +74,10 @@ terminate(_Reason, #q{work_waiters=Worke lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). -handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) -> - Q = Q0#q{size = increment_queue_size(Q0, Item), +handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> + Q = Q0#q{size = Q0#q.size + Size, % increment_queue_size(Q0, Item), items = Q0#q.items + 1, - queue = queue:in(Item, Q0#q.queue)}, + queue = queue:in({Item, Size}, Q0#q.queue)}, case (Q#q.size >= Q#q.max_size) orelse (Q#q.items >= Q#q.max_items) of true -> @@ -82,7 +86,7 @@ handle_call({queue, Item}, From, #q{work {reply, ok, Q} end; -handle_call({queue, Item}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> +handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> gen_server:reply(W, {ok, [Item]}), {reply, ok, Q#q{work_waiters = Rest}}; @@ -107,38 +111,44 @@ deliver_queue_items(Max, Q) -> #q{ queue = Queue, items = Count, + size = Size, close_on_dequeue = Close, blocked = Blocked } = Q, case (Max =:= all) orelse (Max >= Count) of false -> - {Items, Queue2, Blocked2} = dequeue_items(Max, Queue, Blocked, []), - Q2 = Q#q{items = Count - Max, blocked = Blocked2, queue = Queue2}, + {Items, Size2, Queue2, Blocked2} = dequeue_items( + Max, Size, Queue, Blocked, []), + Q2 = Q#q{ + items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2 + }, {reply, {ok, Items}, Q2}; true -> lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, + Items = [Item || {Item, _} <- queue:to_list(Queue)], case Close of false -> - {reply, {ok, queue:to_list(Queue)}, Q2}; + {reply, {ok, Items}, Q2}; true -> - {stop, normal, {ok, queue:to_list(Queue)}, Q2} + {stop, normal, {ok, Items}, Q2} end end. -dequeue_items(0, Queue, Blocked, DequeuedAcc) -> - {lists:reverse(DequeuedAcc), Queue, Blocked}; +dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) -> + {lists:reverse(DequeuedAcc), Size, Queue, Blocked}; -dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) -> - {{value, Item}, Queue2} = queue:out(Queue), +dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) -> + {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue), case Blocked of [] -> Blocked2 = Blocked; [From | Blocked2] -> gen_server:reply(From, ok) end, - dequeue_items(NumItems - 1, Queue2, Blocked2, [Item | DequeuedAcc]). + dequeue_items( + NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]). handle_cast(close, #q{items = 0} = Q) -> @@ -153,10 +163,3 @@ code_change(_OldVsn, State, _Extra) -> handle_info(X, Q) -> {stop, X, Q}. - -increment_queue_size(#q{max_size = nil, size = Size}, _Item) -> - Size; -increment_queue_size(#q{size = Size}, Item) when is_binary(Item) -> - Size + byte_size(Item); -increment_queue_size(#q{size = Size}, Item) -> - Size + byte_size(term_to_binary(Item)).