Fix bugs with couch_proc_manager limits

This fixes the couch_proc_manager limit counting by rearranging the increment 
and decrements when processes are created and destroyed. It ensures that each 
time we remove a process from the ets table that we decrement appropriately.

For incrementing, things are a bit more complicated in that we need to 
increment before inserting to the table. This is so that our hard limit applies 
even if one of our asynchronous spawn calls is opening a new process. This is 
accomplished by incrementing the counter and storing the async open call 
information in a new ets table. If the open is successful the counter is left 
untouched. If the open fails then we need to decrement the counter.

This also simplifies starting waiting clients when a processes is either 
returned, exits, or fails to start by isolating the logic and calling it in 
each place as necessary.

Closes COUCHDB-2321


Branch: refs/heads/master
Commit: db58e794f937a52b6b61c964942e56afa7d03d8b
Parents: 28a7f57
Author: Paul J. Davis <>
Authored: Fri Sep 5 17:35:14 2014 -0500
Committer: Paul J. Davis <>
Committed: Fri Sep 5 18:18:59 2014 -0500

 src/couch_proc_manager.erl | 484 +++++++++++++++++++++++-----------------
 1 file changed, 276 insertions(+), 208 deletions(-)
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 9232fd7..8f29c00 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -39,12 +39,16 @@
+-define(PROCS, couch_proc_manager_procs).
+-define(WAITERS, couch_proc_manager_waiters).
+-define(OPENING, couch_proc_manager_opening).
 -record(state, {
-    tab,
-    proc_counts,
-    waiting,
-    threshold_ts
+    counts,
+    threshold_ts,
+    hard_limit,
+    soft_limit
 -record(client, {
@@ -58,7 +62,7 @@
 -record(proc_int, {
-    client = nil,
+    client,
     ddoc_keys = [],
@@ -66,51 +70,70 @@
     t0 = os:timestamp()
 start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 get_proc_count() ->
     gen_server:call(?MODULE, get_proc_count).
 get_stale_proc_count() ->
     gen_server:call(?MODULE, get_stale_proc_count).
 reload() ->
-    gen_server:call(?MODULE, bump_threshold_ts).
+    gen_server:call(?MODULE, set_threshold_ts).
 terminate_stale_procs() ->
     gen_server:call(?MODULE, terminate_stale_procs).
 init([]) ->
     process_flag(trap_exit, true),
-    ok = config:listen_for_changes(?MODULE, nil),
+    ok = config:listen_for_changes(?MODULE, undefined),
+    TableOpts = [public, named_table, ordered_set],
+    ets:new(?PROCS, TableOpts ++ [{keypos,}]),
+    ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]),
+    ets:new(?OPENING, [public, named_table, set]),
     {ok, #state{
-        tab = ets:new(procs, [ordered_set, {keypos,}]),
         config = get_proc_config(),
-        proc_counts = dict:new(),
-        waiting = ets:new(couch_proc_manage_waiting,
-                [ordered_set, {keypos, #client.timestamp}])
+        counts = dict:new(),
+        threshold_ts = os:timestamp(),
+        hard_limit = get_hard_limit(),
+        soft_limit = get_soft_limit()
-handle_call(get_table, _From, State) ->
-    {reply,, State};
+terminate(_Reason, _State) ->
+    ets:foldl(fun(#proc_int{pid=P}, _) ->
+        couch_util:shutdown_sync(P)
+    end, 0, ?PROCS),
+    ok.
 handle_call(get_proc_count, _From, State) ->
-    {reply, ets:info(, size), State};
+    NumProcs = ets:info(?PROCS, size),
+    NumOpening = ets:info(?OPENING, size),
+    {reply, NumProcs + NumOpening, State};
 handle_call(get_stale_proc_count, _From, State) ->
-    #state{tab = Tab, threshold_ts = T0} = State,
+    #state{threshold_ts = T0} = State,
     MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', T0}], [true]}],
-    {reply, ets:select_count(Tab, MatchSpec), State};
+    {reply, ets:select_count(?PROCS, MatchSpec), State};
 handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
     {ClientPid, _} = From,
-    Lang = couch_util:to_binary(
-            couch_util:get_value(<<"language">>, Props, <<"javascript">>)),
+    LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    Lang = couch_util:to_binary(LangStr),
     IterFun = fun(Proc, Acc) ->
         case lists:member(DDocKey, Proc#proc_int.ddoc_keys) of
             true ->
-                {stop, assign_proc(, ClientPid, Proc)};
+                {stop, assign_proc(ClientPid, Proc)};
             false ->
                 {ok, Acc}
@@ -118,7 +141,7 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, 
From, State) ->
     TeachFun = fun(Proc0, Acc) ->
             {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
-            {stop, assign_proc(, ClientPid, Proc1)}
+            {stop, assign_proc(ClientPid, Proc1)}
         catch _:_ ->
             {ok, Acc}
@@ -129,60 +152,76 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, 
From, State) ->
 handle_call({get_proc, Lang}, From, State) ->
     {ClientPid, _} = From,
     IterFun = fun(Proc, _Acc) ->
-        {stop, assign_proc(, ClientPid, Proc)}
+        {stop, assign_proc(ClientPid, Proc)}
     Client = #client{from=From, lang=couch_util:to_binary(Lang)},
     find_proc(State, Client, [IterFun]);
-handle_call({ret_proc, #proc{client=Ref, lang=Lang0} = Proc}, _From, State) ->
+handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
     erlang:demonitor(Ref, [flush]),
-    Lang = couch_util:to_binary(Lang0),
-    % We need to check if the process is alive here, as the client could be
-    % handing us a #proc{} with a dead one.  We would have already removed the
-    % #proc_int{} from our own table, so the alternative is to do a lookup in 
-    % table before the insert.  Don't know which approach is cheaper.
-    {reply, true, return_proc(State, Proc#proc{lang=Lang})};
-handle_call(bump_threshold_ts, _From, #state{tab = Tab} = State) ->
-    FoldFun = fun(#proc_int{client = nil, pid = Pid}, _) ->
-        remove_proc(Tab, Pid);
-    (_, _) ->
-        ok
+    NewState = case ets:lookup(?PROCS, of
+        [#proc_int{}=ProcInt] ->
+            return_proc(State, ProcInt);
+        [] ->
+            % Proc must've died and we already
+            % cleared it out of the table in
+            % the handle_info clause.
+            State
+    end,
+    {reply, true, NewState};
+handle_call(set_threshold_ts, _From, State) ->
+    FoldFun = fun
+        (#proc_int{client = undefined} = Proc, StateAcc) ->
+            remove_proc(StateAcc, Proc);
+        (_, StateAcc) ->
+            StateAcc
-    ets:foldl(FoldFun, nil, Tab),
-    {reply, ok, State#state{threshold_ts = os:timestamp()}};
+    NewState = ets:foldl(FoldFun, State, ?PROCS),
+    {reply, ok, NewState#state{threshold_ts = os:timestamp()}};
-handle_call(terminate_stale_procs, _From, State) ->
-    #state{tab = Tab, threshold_ts = T0} = State,
-    MatchHead = #proc_int{pid = '$1', t0 = '$2', _ = '_'},
-    MatchSpec = [{MatchHead, [{'<', '$2', T0}], ['$1']}],
-    lists:foreach(fun(P) -> remove_proc(Tab,P) end, ets:select(Tab, 
-    {reply, ok, State};
+handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) 
+    FoldFun = fun
+        (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) ->
+            case Ts1 > Ts2 of
+                true ->
+                    remove_proc(StateAcc, Proc);
+                false ->
+                    StateAcc
+            end;
+        (_, StateAcc) ->
+            StateAcc
+    end,
+    NewState = ets:foldl(FoldFun, State, ?PROCS),
+    {reply, ok, NewState};
 handle_call(_Call, _From, State) ->
     {reply, ignored, State}.
-handle_cast({os_proc_idle, Pid}, #state{tab=Tab, proc_counts=Counts}=State0) ->
-    Limit = list_to_integer(
-            config:get("query_server_config", "os_process_soft_limit", "100")),
-    State = case ets:lookup(Tab, Pid) of
-        [#proc_int{client=nil, lang=Lang}] ->
+handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) ->
+    NewState = case ets:lookup(?PROCS, Pid) of
+        [#proc_int{client=undefined, lang=Lang}=Proc] ->
             case dict:find(Lang, Counts) of
-                {ok, Count} when Count > Limit ->
+                {ok, Count} when Count >= State#state.soft_limit ->
                     ?LOG_INFO("Closing idle OS Process: ~p", [Pid]),
-                    remove_proc(Tab, Pid),
-                    State0#state{
-                        proc_counts=dict:update_counter(Lang, -1, Counts)
-                    };
+                    remove_proc(State, Proc);
                 {ok, _} ->
-                    State0
+                    State
         _ ->
-            State0
+            State
-    {noreply, State};
+    {noreply, NewState};
 handle_cast(reload_config, State) ->
-    {noreply, State#state{config = get_proc_config()}};
+    NewState = State#state{
+        config = get_proc_config(),
+        hard_limit = get_hard_limit(),
+        soft_limit = get_soft_limit()
+    },
+    {noreply, flush_waiters(NewState)};
 handle_cast(_Msg, State) ->
     {noreply, State}.
@@ -190,38 +229,37 @@ handle_cast(_Msg, State) ->
 handle_info(shutdown, State) ->
     {stop, shutdown, State};
-handle_info({'EXIT', _, {ok, Proc0, {ClientPid,_} = From}}, State) ->
+handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) ->
+    ets:delete(?OPENING, Pid),
-    Proc = assign_proc(, ClientPid, Proc0),
+    Proc = assign_proc(ClientPid, Proc0),
     gen_server:reply(From, {ok, Proc, State#state.config}),
     {noreply, State};
+handle_info({'EXIT', Pid, spawn_error}, State) ->
+    [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid),
+    ets:delete(?OPENING, Pid),
+    NewState = State#state{
+        counts = dict:update_counter(Lang, -1, State#state.counts)
+    },
+    {noreply, flush_waiters(NewState, Lang)};
 handle_info({'EXIT', Pid, Reason}, State) ->
-    #state{proc_counts=Counts, waiting=Waiting} = State,
     ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
-    MaybeProc = ets:lookup(, Pid),
-    ets:delete(, Pid),
-    case MaybeProc of
-        [#proc_int{lang=Lang}] ->
-            case get_waiting_client(Waiting, Lang) of
-                nil ->
-                    {noreply, State#state{
-                        proc_counts=dict:update_counter(Lang, -1, Counts)
-                    }};
-                Client ->
-                    spawn_link(?MODULE, new_proc, [Client]),
-                    {noreply, State}
-            end;
+    case ets:lookup(?PROCS, Pid) of
+        [#proc_int{} = Proc] ->
+            NewState = remove_proc(State, Proc),
+            {noreply, flush_waiters(NewState, Proc#proc_int.lang)};
         [] ->
             {noreply, State}
 handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
-    case ets:match_object(, #proc_int{client=Ref, _='_'}) of
-    [] ->
-        {noreply, State0};
-    [#proc_int{} = Proc] ->
-        {noreply, return_proc(State0, Proc)}
+    case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of
+        [#proc_int{} = Proc] ->
+            {noreply, return_proc(State0, Proc)};
+        [] ->
+            {noreply, State0}
 handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
@@ -229,31 +267,32 @@ handle_info({gen_event_EXIT, {config_listener, ?MODULE}, 
_Reason}, State) ->
     {noreply, State};
 handle_info(restart_config_listener, State) ->
-    ok = config:listen_for_changes(?MODULE, nil),
-    {noreply, State};
+    ok = config:listen_for_changes(?MODULE, undefined),
+    % Reload our config in case it changed in the last
+    % five seconds.
+    handle_cast(reload, State);
 handle_info(_Msg, State) ->
     {noreply, State}.
-terminate(_Reason, #state{tab=Tab}) ->
-    ets:foldl(fun(#proc_int{pid=P}, _) -> couch_util:shutdown_sync(P) end, 0, 
-    ok.
 code_change(_OldVsn, #state{}=State, _Extra) ->
     {ok, State}.
 handle_config_change("query_server_config", _, _, _, _) ->
     gen_server:cast(?MODULE, reload_config),
-    {ok, nil};
+    {ok, undefined};
 handle_config_change(_, _, _, _, _) ->
-    {ok, nil}.
-find_proc(State, Client, [Fun|FindFuns]) ->
-    try iter_procs(, Client#client.lang, Fun, nil) of
-    {not_found, _} ->
-        find_proc(State, Client, FindFuns);
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
+    {ok, undefined}.
+find_proc(State, Client, [Fun | FindFuns]) ->
+    try iter_procs(Client#client.lang, Fun, undefined) of
+        {not_found, _} ->
+            find_proc(State, Client, FindFuns);
+        {ok, Proc} ->
+            {reply, {ok, Proc, State#state.config}, State}
     catch error:Reason ->
         ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
         {reply, {error, Reason}, State}
@@ -261,62 +300,96 @@ find_proc(State, Client, [Fun|FindFuns]) ->
 find_proc(State, Client, []) ->
     {noreply, maybe_spawn_proc(State, Client)}.
-iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
-    iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
-iter_procs(Tab, Lang, Fun, Acc) ->
-    Pattern = #proc_int{lang=Lang, client=nil, _='_'},
+iter_procs(Lang, Fun, Acc) when is_binary(Lang) ->
+    Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
     MSpec = [{Pattern, [], ['$_']}],
-    case ets:select_reverse(Tab, MSpec, 25) of
+    case ets:select_reverse(?PROCS, MSpec, 25) of
         '$end_of_table' ->
             {not_found, Acc};
         Continuation ->
-            iter_procs(Continuation, Fun, Acc)
+            iter_procs_int(Continuation, Fun, Acc)
-iter_procs({[], Continuation0}, Fun, Acc) ->
+iter_procs_int({[], Continuation0}, Fun, Acc) ->
     case ets:select_reverse(Continuation0) of
         '$end_of_table' ->
             {not_found, Acc};
         Continuation1 ->
-            iter_procs(Continuation1, Fun, Acc)
+            iter_procs_int(Continuation1, Fun, Acc)
-iter_procs({[Proc | Rest], Continuation}, Fun, Acc0) ->
+iter_procs_int({[Proc | Rest], Continuation}, Fun, Acc0) ->
     case Fun(Proc, Acc0) of
         {ok, Acc1} ->
-            iter_procs({Rest, Continuation}, Fun, Acc1);
+            iter_procs_int({Rest, Continuation}, Fun, Acc1);
         {stop, Acc1} ->
             {ok, Acc1}
+maybe_spawn_proc(State, Client) ->
+    case dict:find(Client#client.lang, State#state.counts) of
+    {ok, Count} when Count >= State#state.hard_limit ->
+        add_waiting_client(Client),
+        State;
+    _ ->
+        spawn_proc(State, Client)
+    end.
+spawn_proc(State, Client) ->
+    Pid = spawn_link(?MODULE, new_proc, [Client]),
+    ets:insert(?OPENING, {Pid, Client}),
+    Counts = State#state.counts,
+    Lang = Client#client.lang,
+    State#state{
+        counts = dict:update_counter(Lang, 1, Counts)
+    }.
 new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
     #client{from=From, lang=Lang} = Client,
-    case new_proc_int(From, Lang) of
-    {ok, Proc} ->
-        exit({ok, Proc, From});
-    Error ->
-        gen_server:reply(From, {error, Error})
-    end;
+    Resp = try
+        case new_proc_int(From, Lang) of
+            {ok, Proc} ->
+                {spawn_ok, Proc, From};
+            Error ->
+                gen_server:reply(From, {error, Error}),
+                spawn_error
+        end
+    catch _:_ ->
+        spawn_error
+    end,
+    exit(Resp);
 new_proc(Client) ->
     #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
-    case new_proc_int(From, Lang) of
-    {ok, NewProc} ->
-        case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
-        {ok, Proc} ->
-            exit({ok, Proc, From});
-        {error, Reason} ->
-            gen_server:reply(From, {error, Reason})
-        end;
-    Error ->
-        gen_server:reply(From, {error, Error})
-    end.
+    Resp = try
+        case new_proc_int(From, Lang) of
+        {ok, NewProc} ->
+            case teach_ddoc(DDoc, DDocKey, NewProc) of
+            {ok, Proc} ->
+                {spawn_ok, Proc, From};
+            {error, Reason} ->
+                gen_server:reply(From, {error, Reason}),
+                spawn_error
+            end;
+        Error ->
+            gen_server:reply(From, {error, Error}),
+            spawn_error
+        end
+    catch _:_ ->
+        spawn_error
+    end,
+    exit(Resp).
 new_proc_int(From, Lang) when is_binary(Lang) ->
-    new_proc_int(From, binary_to_list(Lang));
-new_proc_int(From, Lang) when is_list(Lang) ->
-    case config:get("query_servers", Lang) of
+    LangStr = binary_to_list(Lang),
+    case config:get("query_servers", LangStr) of
     undefined ->
-        case config:get("native_query_servers", Lang) of
+        case config:get("native_query_servers", LangStr) of
         undefined ->
             gen_server:reply(From, {unknown_query_language, Lang});
         SpecStr ->
@@ -329,23 +402,6 @@ new_proc_int(From, Lang) when is_list(Lang) ->
         make_proc(Pid, Lang, couch_os_process)
-proc_with_ddoc(DDoc, DDocKey, Procs) ->
-    Filter = fun(#proc_int{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) 
-    case lists:dropwhile(Filter, Procs) of
-    [DDocProc|_] ->
-        {ok, DDocProc};
-    [] ->
-        teach_any_proc(DDoc, DDocKey, Procs)
-    end.
-teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
-    try
-        teach_ddoc(DDoc, DDocKey, Proc)
-    catch _:_ ->
-        teach_any_proc(DDoc, DDocKey, Rest)
-    end;
-teach_any_proc(_, _, []) ->
-    {error, noproc}.
 teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) ->
     % send ddoc over the wire
@@ -360,9 +416,10 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, 
#proc_int{ddoc_keys=Keys}=Proc) ->
     % add ddoc to the proc
     {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}.
-make_proc(Pid, Lang, Mod) ->
+make_proc(Pid, Lang, Mod) when is_binary(Lang) ->
     Proc = #proc_int{
-        lang = couch_util:to_binary(Lang),
+        lang = Lang,
         pid = Pid,
         prompt_fun = {Mod, prompt},
         set_timeout_fun = {Mod, set_timeout},
@@ -371,97 +428,98 @@ make_proc(Pid, Lang, Mod) ->
     {ok, Proc}.
-assign_proc(Tab, ClientPid, #proc_int{client=nil}=Proc0) when 
is_pid(ClientPid) ->
-    Proc = Proc0#proc_int{client = erlang:monitor(process, ClientPid)},
-    ets:insert(Tab, Proc),
+assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) ->
+    Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)},
+    ets:insert(?PROCS, Proc),
-assign_proc(Tab, #client{}=Client, #proc_int{client=nil}=Proc) ->
+assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) ->
     {Pid, _} = Client#client.from,
-    assign_proc(Tab, Pid, Proc).
+    assign_proc(Pid, Proc).
-return_proc(#state{} = State, #proc{} = Proc) ->
-    case ets:lookup(, of
-        [#proc_int{}=ProcInt] ->
-            return_proc(State, ProcInt);
-        [] ->
-            % Proc must've died and we already
-            % cleared it out of the table in
-            % the handle_info clause.
-            ok
-    end;
 return_proc(#state{} = State, #proc_int{} = ProcInt) ->
-    #state{tab = Tab, waiting = Waiting, threshold_ts = T0} = State,
     #proc_int{pid = Pid, lang = Lang} = ProcInt,
-    case is_process_alive(Pid) of true ->
-        case get_waiting_client(Waiting, Lang) of
-            nil ->
-                if ProcInt#proc_int.t0 < T0 ->
-                    remove_proc(Tab, Pid);
-                true ->
-                    gen_server:cast(Pid, garbage_collect),
-                    ets:insert(Tab, ProcInt#proc_int{client=nil})
-                end,
-                State;
-            #client{}=Client ->
-                From = Client#client.from,
-                assign_proc(Tab, Client, ProcInt#proc_int{client=nil}),
-                gen_server:reply(From, {ok, ProcInt, State#state.config}),
+    NewState = case is_process_alive(Pid) of true ->
+        case ProcInt#proc_int.t0 < State#state.threshold_ts of
+            true ->
+                remove_proc(State, Pid);
+            false ->
+                gen_server:cast(Pid, garbage_collect),
+                true = ets:update_element(?PROCS, Pid, [
+                    {#proc_int.client, undefined}
+                ]),
     false ->
-        ets:delete(Tab, Pid),
-        case get_waiting_client(Waiting, Lang) of
-            nil ->
-                State;
-            #client{}=Client ->
-                maybe_spawn_proc(State, Client)
-        end
-    end.
+        remove_proc(State, ProcInt)
+    end,
+    flush_waiters(NewState, Lang).
-remove_proc(Tab, Pid) ->
-    ets:delete(Tab, Pid),
-    case is_process_alive(Pid) of true ->
-        unlink(Pid),
-        gen_server:cast(Pid, stop);
+remove_proc(State, #proc_int{}=Proc) ->
+    ets:delete(?PROCS,,
+    case is_process_alive( of true ->
+        unlink(,
+        gen_server:cast(, stop);
     false ->
-    end.
+    end,
+    Counts = State#state.counts,
+    Lang = Proc#proc_int.lang,
+    State#state{
+        counts = dict:update_counter(Lang, -1, Counts)
+    }.
 -spec export_proc(#proc_int{}) -> #proc{}.
 export_proc(#proc_int{} = ProcInt) ->
-    [_ | Data] = lists:sublist(tuple_to_list(ProcInt), record_info(size, 
+    ProcIntList = tuple_to_list(ProcInt),
+    ProcLen = record_info(size, proc),
+    [_ | Data] = lists:sublist(ProcIntList, ProcLen),
     list_to_tuple([proc | Data]).
-maybe_spawn_proc(State, Client) ->
-    #state{proc_counts=Counts, waiting=Waiting} = State,
-    #client{lang=Lang} = Client,
-    Limit = list_to_integer(config:get(
-                "query_server_config", "os_process_limit", "100")),
-    case dict:find(Lang, Counts) of
-    {ok, Limit} ->
-        add_waiting_client(Waiting, Client),
-        State;
-    _ ->
-        spawn_link(?MODULE, new_proc, [Client]),
-        State#state{
-            proc_counts=dict:update_counter(Lang, 1, Counts)
-        }
+flush_waiters(State) ->
+    dict:fold(fun(Lang, Count, StateAcc) ->
+        case Count < State#state.hard_limit of
+            true ->
+                flush_waiters(StateAcc, Lang);
+            false ->
+                StateAcc
+        end
+    end, State, State#state.counts).
+flush_waiters(State, Lang) ->
+    case dict:fetch(Lang, State#state.counts) of
+        Count when Count < State#state.hard_limit ->
+            case get_waiting_client(Lang) of
+                #client{} = Client ->
+                    NewState = spawn_proc(State, Client),
+                    flush_waiters(NewState, Lang);
+                undefined ->
+                    State
+            end;
+        _ ->
+            State
-add_waiting_client(Tab, Client) ->
-    ets:insert(Tab, Client#client{timestamp=os:timestamp()}).
-get_waiting_client(Tab, Lang) when is_list(Lang) ->
-    get_waiting_client(Tab, couch_util:to_binary(Lang));
-get_waiting_client(Tab, Lang) ->
-    case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
+add_waiting_client(Client) ->
+    ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}).
+get_waiting_client(Lang) ->
+    case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of
         '$end_of_table' ->
-            nil;
+            undefined;
         {[#client{}=Client], _} ->
-            ets:delete(Tab, Client#client.timestamp),
+            ets:delete(?WAITERS, Client#client.timestamp),
 get_proc_config() ->
     Limit = config:get("query_server_config", "reduce_limit", "true"),
     Timeout = config:get("couchdb", "os_process_timeout", "5000"),
@@ -469,3 +527,13 @@ get_proc_config() ->
         {<<"reduce_limit">>, list_to_atom(Limit)},
         {<<"timeout">>, list_to_integer(Timeout)}
+get_hard_limit() ->
+    LimStr = config:get("query_server_config", "os_process_limit", "100"),
+    list_to_integer(LimStr).
+get_soft_limit() ->
+    LimStr = config:get("query_server_config", "os_process_soft_limit", "100"),
+    list_to_integer(LimStr).

