Author: fdmanana Date: Sun Sep 11 18:51:45 2011 New Revision: 1169498 URL: http://svn.apache.org/viewvc?rev=1169498&view=rev Log: Make sure httpc pool doesn't reuse dead connections
Race conditions allowed dead http connections to be added back to the httpc pool and then be given back to clients. In extreme cases this could lead to a pool full of dead http connections. Issue identified by Dale Harvey. Thanks. Added: couchdb/trunk/test/etap/230-httpc-pool.t (with props) Modified: couchdb/trunk/src/couchdb/couch_httpc_pool.erl couchdb/trunk/test/etap/Makefile.am Modified: couchdb/trunk/src/couchdb/couch_httpc_pool.erl URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpc_pool.erl?rev=1169498&r1=1169497&r2=1169498&view=diff ============================================================================== --- couchdb/trunk/src/couchdb/couch_httpc_pool.erl (original) +++ couchdb/trunk/src/couchdb/couch_httpc_pool.erl Sun Sep 11 18:51:45 2011 @@ -84,21 +84,27 @@ handle_call(stop, _From, State) -> handle_cast({release_worker, Worker}, #state{waiting = Waiting} = State) -> - case queue:out(Waiting) of - {empty, Waiting2} -> - Busy2 = State#state.busy -- [Worker], - Free2 = [Worker | State#state.free]; - {{value, From}, Waiting2} -> - gen_server:reply(From, {ok, Worker}), - Busy2 = State#state.busy, - Free2 = State#state.free - end, - NewState = State#state{ - busy = Busy2, - free = Free2, - waiting = Waiting2 - }, - {noreply, NewState}. + case is_process_alive(Worker) andalso + lists:member(Worker, State#state.busy) of + true -> + case queue:out(Waiting) of + {empty, Waiting2} -> + Busy2 = State#state.busy -- [Worker], + Free2 = [Worker | State#state.free]; + {{value, From}, Waiting2} -> + gen_server:reply(From, {ok, Worker}), + Busy2 = State#state.busy, + Free2 = State#state.free + end, + NewState = State#state{ + busy = Busy2, + free = Free2, + waiting = Waiting2 + }, + {noreply, NewState}; + false -> + {noreply, State} + end. handle_info({'EXIT', Pid, _Reason}, #state{busy = Busy, free = Free} = State) -> @@ -108,7 +114,14 @@ handle_info({'EXIT', Pid, _Reason}, #sta Busy -> {noreply, State}; Busy2 -> - {noreply, State#state{busy = Busy2}} + case queue:out(State#state.waiting) of + {empty, _} -> + {noreply, State#state{busy = Busy2}}; + {{value, From}, Waiting2} -> + {ok, Worker} = ibrowse:spawn_link_worker_process(State#state.url), + gen_server:reply(From, {ok, Worker}), + {noreply, State#state{busy = [Worker | Busy2], waiting = Waiting2}} + end end; Free2 -> {noreply, State#state{free = Free2}} Added: couchdb/trunk/test/etap/230-httpc-pool.t URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/230-httpc-pool.t?rev=1169498&view=auto ============================================================================== --- couchdb/trunk/test/etap/230-httpc-pool.t (added) +++ couchdb/trunk/test/etap/230-httpc-pool.t Sun Sep 11 18:51:45 2011 @@ -0,0 +1,250 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +main(_) -> + test_util:init_code_path(), + + etap:plan(55), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), + etap:bail(Other) + end, + ok. + + +test() -> + couch_server_sup:start_link(test_util:config_files()), + ibrowse:start(), + + test_pool_full(), + test_worker_dead_pool_non_full(), + test_worker_dead_pool_full(), + + couch_server_sup:stop(), + ok. + + +test_pool_full() -> + Pool = spawn_pool(), + Client1 = spawn_client(Pool), + Client2 = spawn_client(Pool), + Client3 = spawn_client(Pool), + + etap:diag("Check that we can spawn the max number of connections."), + etap:is(ping_client(Client1), ok, "Client 1 started ok."), + etap:is(ping_client(Client2), ok, "Client 2 started ok."), + etap:is(ping_client(Client3), ok, "Client 3 started ok."), + + Worker1 = get_client_worker(Client1, "1"), + Worker2 = get_client_worker(Client2, "2"), + Worker3 = get_client_worker(Client3, "3"), + etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."), + etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."), + etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."), + + etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."), + etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."), + etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."), + + etap:diag("Check that client 4 blocks waiting for a worker."), + Client4 = spawn_client(Pool), + etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."), + + etap:diag("Check that stopping a client gives up its worker."), + etap:is(stop_client(Client1), ok, "First client stopped."), + + etap:diag("And check that our blocked client has been unblocked."), + etap:is(ping_client(Client4), ok, "Client 4 was unblocked."), + + Worker4 = get_client_worker(Client4, "4"), + etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."), + etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."), + + lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]), + stop_pool(Pool). + + +test_worker_dead_pool_non_full() -> + Pool = spawn_pool(), + Client1 = spawn_client(Pool), + + etap:is(ping_client(Client1), ok, "Client 1 started ok."), + Worker1 = get_client_worker(Client1, "1"), + etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."), + + etap:diag("Kill client's 1 worker."), + etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."), + etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."), + + etap:is(stop_client(Client1), ok, "First client stopped and released its worker."), + + Client2 = spawn_client(Pool), + etap:is(ping_client(Client2), ok, "Client 2 started ok."), + Worker2 = get_client_worker(Client2, "2"), + etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"), + etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."), + + etap:is(stop_client(Client2), ok, "Second client stopped."), + stop_pool(Pool). + + +test_worker_dead_pool_full() -> + Pool = spawn_pool(), + Client1 = spawn_client(Pool), + Client2 = spawn_client(Pool), + Client3 = spawn_client(Pool), + + etap:diag("Check that we can spawn the max number of connections."), + etap:is(ping_client(Client1), ok, "Client 1 started ok."), + etap:is(ping_client(Client2), ok, "Client 2 started ok."), + etap:is(ping_client(Client3), ok, "Client 3 started ok."), + + Worker1 = get_client_worker(Client1, "1"), + Worker2 = get_client_worker(Client2, "2"), + Worker3 = get_client_worker(Client3, "3"), + etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."), + etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."), + etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."), + + etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."), + etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."), + etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."), + + etap:diag("Check that client 4 blocks waiting for a worker."), + Client4 = spawn_client(Pool), + etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."), + + etap:diag("Kill client's 1 worker."), + etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."), + etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."), + + etap:diag("Check client 4 got unblocked after first worker's death"), + etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."), + + Worker4 = get_client_worker(Client4, "4"), + etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."), + etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."), + etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."), + etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."), + + etap:diag("Check that stopping client 1 is a noop."), + etap:is(stop_client(Client1), ok, "First client stopped."), + + etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."), + etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."), + etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."), + + etap:diag("Check that client 5 blocks waiting for a worker."), + Client5 = spawn_client(Pool), + etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."), + + etap:diag("Check that stopping client 2 gives up its worker."), + etap:is(stop_client(Client2), ok, "Second client stopped."), + + etap:diag("Now check that client 5 has been unblocked."), + etap:is(ping_client(Client5), ok, "Client 5 was unblocked."), + + Worker5 = get_client_worker(Client5, "5"), + etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."), + etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."), + etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."), + etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."), + etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."), + + etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."), + etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."), + etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."), + + lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]), + stop_pool(Pool). + + +spawn_client(Pool) -> + Parent = self(), + Ref = make_ref(), + Pid = spawn(fun() -> + {ok, Worker} = couch_httpc_pool:get_worker(Pool), + loop(Parent, Ref, Worker, Pool) + end), + {Pid, Ref}. + + +ping_client({Pid, Ref}) -> + Pid ! ping, + receive + {pong, Ref} -> + ok + after 3000 -> + timeout + end. + + +get_client_worker({Pid, Ref}, ClientName) -> + Pid ! get_worker, + receive + {worker, Ref, Worker} -> + Worker + after 3000 -> + etap:bail("Timeout getting client " ++ ClientName ++ " worker.") + end. + + +stop_client({Pid, Ref}) -> + Pid ! stop, + receive + {stop, Ref} -> + ok + after 3000 -> + timeout + end. + + +kill_client_worker({Pid, Ref}) -> + Pid ! get_worker, + receive + {worker, Ref, Worker} -> + exit(Worker, kill), + ok + after 3000 -> + timeout + end. + + +loop(Parent, Ref, Worker, Pool) -> + receive + ping -> + Parent ! {pong, Ref}, + loop(Parent, Ref, Worker, Pool); + get_worker -> + Parent ! {worker, Ref, Worker}, + loop(Parent, Ref, Worker, Pool); + stop -> + couch_httpc_pool:release_worker(Pool, Worker), + Parent ! {stop, Ref} + end. + + +spawn_pool() -> + Host = couch_config:get("httpd", "bind_address", "127.0.0.1"), + Port = couch_config:get("httpd", "port", "5984"), + {ok, Pool} = couch_httpc_pool:start_link( + "http://" ++ Host ++ ":5984", [{max_connections, 3}]), + Pool. + + +stop_pool(Pool) -> + ok = couch_httpc_pool:stop(Pool). Propchange: couchdb/trunk/test/etap/230-httpc-pool.t ------------------------------------------------------------------------------ svn:executable = * Modified: couchdb/trunk/test/etap/Makefile.am URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/Makefile.am?rev=1169498&r1=1169497&r2=1169498&view=diff ============================================================================== --- couchdb/trunk/test/etap/Makefile.am (original) +++ couchdb/trunk/test/etap/Makefile.am Sun Sep 11 18:51:45 2011 @@ -85,4 +85,5 @@ EXTRA_DIST = \ 190-json-stream-parse.t \ 200-view-group-no-db-leaks.t \ 210-os-proc-pool.t \ - 220-compaction-daemon.t + 220-compaction-daemon.t \ + 230-httpc-pool.t