Hello community, here is the log from the commit of package rabbitmq-server for openSUSE:Factory checked in at 2014-03-28 12:13:08 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/rabbitmq-server (Old) and /work/SRC/openSUSE:Factory/.rabbitmq-server.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "rabbitmq-server" Changes: -------- --- /work/SRC/openSUSE:Factory/rabbitmq-server/rabbitmq-server.changes 2014-01-31 09:44:14.000000000 +0100 +++ /work/SRC/openSUSE:Factory/.rabbitmq-server.new/rabbitmq-server.changes 2014-03-28 12:13:09.000000000 +0100 @@ -1,0 +2,7 @@ +Thu Mar 27 11:20:12 UTC 2014 - dmuel...@suse.com + +- update to 3.2.4: + * Several bug fixes, no detailed changelog available +- remove fix-syntax-error-in-example-conf.patch: merged upstream + +------------------------------------------------------------------- Old: ---- fix-syntax-error-in-example-conf.patch rabbitmq-server-3.2.3.tar.gz New: ---- rabbitmq-server-3.2.4.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ rabbitmq-server.spec ++++++ --- /var/tmp/diff_new_pack.pYwE62/_old 2014-03-28 12:13:10.000000000 +0100 +++ /var/tmp/diff_new_pack.pYwE62/_new 2014-03-28 12:13:10.000000000 +0100 @@ -24,7 +24,7 @@ %endif Name: rabbitmq-server -Version: 3.2.3 +Version: 3.2.4 Release: 0 Summary: The RabbitMQ Server License: MPL-1.1 @@ -41,7 +41,6 @@ Source7: rabbitmq-server.tmpfiles.d.conf Patch0: no-nmap.patch Patch1: no-remove-common.patch -Patch2: fix-syntax-error-in-example-conf.patch BuildRequires: erlang BuildRequires: erlang-src BuildRequires: fdupes @@ -103,7 +102,6 @@ %setup -q %patch0 %patch1 -%patch2 %build make all VERSION=%{version} ++++++ rabbitmq-server-3.2.3.tar.gz -> rabbitmq-server-3.2.4.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/docs/rabbitmq.config.example new/rabbitmq-server-3.2.4/docs/rabbitmq.config.example --- old/rabbitmq-server-3.2.3/docs/rabbitmq.config.example 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/docs/rabbitmq.config.example 2014-03-03 17:22:14.000000000 +0100 @@ -336,7 +336,7 @@ %% {reconnect_delay, 2.5} %% ]} %% End of my_first_shovel - ]}, + ]} %% Rather than specifying some values per-shovel, you can specify %% them for all shovels here. %% diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/ebin/rabbit_app.in new/rabbitmq-server-3.2.4/ebin/rabbit_app.in --- old/rabbitmq-server-3.2.3/ebin/rabbit_app.in 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/ebin/rabbit_app.in 2014-03-03 17:22:14.000000000 +0100 @@ -1,7 +1,7 @@ {application, rabbit, %% -*- erlang -*- [{description, "RabbitMQ"}, {id, "RabbitMQ"}, - {vsn, "3.2.3"}, + {vsn, "3.2.4"}, {modules, []}, {registered, [rabbit_amqqueue_sup, rabbit_log, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-erlang-client/src/amqp_channels_manager.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-erlang-client/src/amqp_channels_manager.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-erlang-client/src/amqp_channels_manager.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-erlang-client/src/amqp_channels_manager.erl 2014-03-03 17:22:14.000000000 +0100 @@ -173,6 +173,8 @@ maybe_report_down(_Pid, normal, _State) -> ok; +maybe_report_down(_Pid, shutdown, _State) -> + ok; maybe_report_down(_Pid, {app_initiated_close, _, _}, _State) -> ok; maybe_report_down(_Pid, {server_initiated_close, _, _}, _State) -> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-federation/src/rabbit_federation_exchange_link.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-federation/src/rabbit_federation_exchange_link.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-federation/src/rabbit_federation_exchange_link.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-federation/src/rabbit_federation_exchange_link.erl 2014-03-03 17:22:14.000000000 +0100 @@ -355,13 +355,6 @@ key(#binding{key = Key, args = Args}) -> {Key, Args}. go(S0 = {not_started, {Upstream, UParams, DownXName}}) -> - %% We trap exits so terminate/2 gets called. Note that this is not - %% in init() since we need to cope with the link getting restarted - %% during shutdown (when a broker federates with itself), which - %% means we hang in federation_up() and the supervisor must force - %% us to exit. We can therefore only trap exits when past that - %% point. Bug 24372 may help us do something nicer. - process_flag(trap_exit, true), Unacked = rabbit_federation_link_util:unacked_new(), rabbit_federation_link_util:start_conn_ch( fun (Conn, Ch, DConn, DCh) -> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-federation/src/rabbit_federation_exchange_link_sup_sup.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-federation/src/rabbit_federation_exchange_link_sup_sup.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-federation/src/rabbit_federation_exchange_link_sup_sup.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-federation/src/rabbit_federation_exchange_link_sup_sup.erl 2014-03-03 17:22:14.000000000 +0100 @@ -30,8 +30,9 @@ %%---------------------------------------------------------------------------- start_link() -> - mirrored_supervisor:start_link({local, ?SUPERVISOR}, - ?SUPERVISOR, ?MODULE, []). + mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR, + fun rabbit_misc:execute_mnesia_transaction/1, + ?MODULE, []). %% Note that the next supervisor down, rabbit_federation_link_sup, is common %% between exchanges and queues. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-federation/src/rabbit_federation_link_util.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-federation/src/rabbit_federation_link_util.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-federation/src/rabbit_federation_link_util.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-federation/src/rabbit_federation_link_util.erl 2014-03-03 17:22:14.000000000 +0100 @@ -36,13 +36,6 @@ start_conn_ch(Fun, Upstream, UParams, XorQName = #resource{virtual_host = DownVHost}, State) -> - %% We trap exits so terminate/2 gets called. Note that this is not - %% in init() since we need to cope with the link getting restarted - %% during shutdown (when a broker federates with itself), which - %% means we hang in federation_up() and the supervisor must force - %% us to exit. We can therefore only trap exits when past that - %% point. Bug 24372 may help us do something nicer. - process_flag(trap_exit, true), case open_monitor(local_params(Upstream, DownVHost)) of {ok, DConn, DCh} -> case Upstream#upstream.ack_mode of @@ -55,6 +48,12 @@ end, case open_monitor(UParams#upstream_params.params) of {ok, Conn, Ch} -> + %% Don't trap exits until we have established + %% connections so that if we try to delete + %% federation upstreams while waiting for a + %% connection to be established then we don't + %% block + process_flag(trap_exit, true), try R = Fun(Conn, Ch, DConn, DCh), rabbit_log:info( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management/src/rabbit_mgmt_app.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management/src/rabbit_mgmt_app.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management/src/rabbit_mgmt_app.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management/src/rabbit_mgmt_app.erl 2014-03-03 17:22:14.000000000 +0100 @@ -31,7 +31,7 @@ setup_wm_logging(), register_context(Listener), log_startup(Listener), - rabbit_mgmt_sup:start_link(). + rabbit_mgmt_sup_sup:start_link(). stop(_State) -> unregister_context(), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management/src/rabbit_mgmt_db.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management/src/rabbit_mgmt_db.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management/src/rabbit_mgmt_db.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management/src/rabbit_mgmt_db.erl 2014-03-03 17:22:14.000000000 +0100 @@ -179,19 +179,14 @@ %%---------------------------------------------------------------------------- start_link() -> - %% When failing over it is possible that the mirrored_supervisor - %% might hear of the death of the old DB, and start a new one, - %% before the global name server notices. Therefore rather than - %% telling gen_server:start_link/4 to register it for us, we - %% invoke global:re_register_name/2 ourselves, and just steal the - %% name if it existed before. We therefore rely on - %% mirrored_supervisor to maintain the uniqueness of this process. - case gen_server2:start_link(?MODULE, [], []) of - {ok, Pid} -> yes = global:re_register_name(?MODULE, Pid), + case gen_server2:start_link({global, ?MODULE}, ?MODULE, [], []) of + {ok, Pid} -> register(?MODULE, Pid), %% [1] rabbit:force_event_refresh(), {ok, Pid}; Else -> Else end. +%% [1] For debugging it's helpful to locally register the name too +%% since that shows up in places global names don't. %% R = Ranges, M = Mode augment_exchanges(Xs, R, M) -> safe_call({augment_exchanges, Xs, R, M}, Xs). @@ -211,12 +206,19 @@ override_lookups(Lookups) -> safe_call({override_lookups, Lookups}). reset_lookups() -> safe_call(reset_lookups). -safe_call(Term) -> safe_call(Term, []). +safe_call(Term) -> safe_call(Term, []). +safe_call(Term, Default) -> safe_call(Term, Default, 1). -safe_call(Term, Item) -> +%% See rabbit_mgmt_sup_sup for a discussion of the retry logic. +safe_call(Term, Default, Retries) -> try gen_server2:call({global, ?MODULE}, Term, infinity) - catch exit:{noproc, _} -> Item + catch exit:{noproc, _} -> + case Retries of + 0 -> Default; + _ -> rabbit_mgmt_sup_sup:start_child(), + safe_call(Term, Default, Retries - 1) + end end. %%---------------------------------------------------------------------------- diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management/src/rabbit_mgmt_sup.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management/src/rabbit_mgmt_sup.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management/src/rabbit_mgmt_sup.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management/src/rabbit_mgmt_sup.erl 2014-03-03 17:22:14.000000000 +0100 @@ -29,4 +29,6 @@ {ok, {{one_for_one, 10, 10}, [DB]}}. start_link() -> - mirrored_supervisor:start_link({local, ?MODULE}, ?MODULE, ?MODULE, []). + mirrored_supervisor:start_link( + {local, ?MODULE}, ?MODULE, fun rabbit_misc:execute_mnesia_transaction/1, + ?MODULE, []). diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management/src/rabbit_mgmt_sup_sup.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management/src/rabbit_mgmt_sup_sup.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management/src/rabbit_mgmt_sup_sup.erl 1970-01-01 01:00:00.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management/src/rabbit_mgmt_sup_sup.erl 2014-03-03 17:22:14.000000000 +0100 @@ -0,0 +1,64 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ Management Console. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_mgmt_sup_sup). + +%% We want there to be one management database in the cluster, with a +%% globally registered name. So we use mirrored_supervisor for +%% failover (in rabbit_mgmt_sup) and register a global name for the +%% database. +%% +%% Unfortunately it's more complicated than using these things +%% naively. The first problem is that on failover the mirrored +%% supervisor might move the DB to a new node before the global name +%% database notices and removes the old record. In that case starting +%% the new database will fail. +%% +%% The second problem is that after a network partition things get +%% worse. Since mirrored_supervisor uses Mnesia for global shared +%% state, we have effectively two (or more) mirrored_supervisors. But +%% the global name database does not do this, so at least one of them +%% cannot start the management database; so the mirrored supervisor +%% has to die. But what if the admin restarts the partition which +%% contains the management DB? In that case we need to start a new +%% management DB in the winning partition. +%% +%% Rather than try to get mirrored_supervisor to handle this +%% post-partition state we go for a simpler approach: allow the whole +%% mirrored_supervisor to die in the two edge cases above, and +%% whenever we want to call into the mgmt DB we will start it up if it +%% appears not to be there. See rabbit_mgmt_db:safe_call/3 for the +%% code which restarts the DB if necessary. + +-behaviour(supervisor2). + +-export([start_link/0, start_child/0]). +-export([init/1]). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +start_link() -> supervisor2:start_link({local, ?MODULE}, ?MODULE, []). + +start_child() -> supervisor2:start_child( ?MODULE, sup()). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 0, 1}, [sup()]}}. + +sup() -> + {rabbit_mgmt_sup, {rabbit_mgmt_sup, start_link, []}, + temporary, ?MAX_WAIT, supervisor, [rabbit_mgmt_sup]}. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management-agent/src/rabbit_mgmt_external_stats.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management-agent/src/rabbit_mgmt_external_stats.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-management-agent/src/rabbit_mgmt_external_stats.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-management-agent/src/rabbit_mgmt_external_stats.erl 2014-03-03 17:22:14.000000000 +0100 @@ -233,6 +233,8 @@ format_mochiweb_option(ssl_opts, V) -> format_mochiweb_option_list(V); +format_mochiweb_option(ciphers, V) -> + list_to_binary(rabbit_misc:format("~w", [V])); format_mochiweb_option(_K, V) when is_list(V) -> list_to_binary(V); format_mochiweb_option(_K, V) -> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-mqtt/src/rabbit_mqtt_processor.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-mqtt/src/rabbit_mqtt_processor.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-mqtt/src/rabbit_mqtt_processor.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-mqtt/src/rabbit_mqtt_processor.erl 2014-03-03 17:22:14.000000000 +0100 @@ -363,6 +363,9 @@ supported_subs_qos(?QOS_1) -> ?QOS_1; supported_subs_qos(?QOS_2) -> ?QOS_1. +delivery_mode(?QOS_0) -> 1; +delivery_mode(?QOS_1) -> 2. + %% different qos subscriptions are received in different queues %% with appropriate durability and timeout arguments %% this will lead to duplicate messages for overlapping subscriptions @@ -444,7 +447,8 @@ rabbit_mqtt_util:mqtt2amqp(Topic)}, Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}, {<<"x-mqtt-dup">>, bool, Dup}], - Msg = #amqp_msg{ props = #'P_basic'{ headers = Headers }, + Msg = #amqp_msg{ props = #'P_basic'{ headers = Headers, + delivery_mode = delivery_mode(Qos)}, payload = Payload }, {UnackedPubs1, Ch, SeqNo1} = case Qos =:= ?QOS_1 andalso MessageId =/= undefined of diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-shovel/src/rabbit_shovel_worker.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-shovel/src/rabbit_shovel_worker.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-shovel/src/rabbit_shovel_worker.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-shovel/src/rabbit_shovel_worker.erl 2014-03-03 17:22:14.000000000 +0100 @@ -47,7 +47,6 @@ {noreply, State}. handle_cast(init, State = #state{config = Config}) -> - process_flag(trap_exit, true), %% TODO when we move to minimum R13B01: %% random:seed(now()), {A, B, C} = now(), @@ -58,6 +57,11 @@ {OutboundConn, OutboundChan, OutboundParams} = make_conn_and_chan(Destinations#endpoint.amqp_params), + %% Don't trap exits until we have established connections so that + %% if we try to shut down while waiting for a connection to be + %% established then we don't block + process_flag(trap_exit, true), + create_resources(InboundChan, Sources#endpoint.resource_declarations), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-shovel/src/rabbit_shovel_worker_sup.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-shovel/src/rabbit_shovel_worker_sup.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-shovel/src/rabbit_shovel_worker_sup.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-shovel/src/rabbit_shovel_worker_sup.erl 2014-03-03 17:22:14.000000000 +0100 @@ -23,6 +23,7 @@ start_link(ShovelName, ShovelConfig) -> mirrored_supervisor:start_link({local, ShovelName}, ShovelName, + fun rabbit_misc:execute_mnesia_transaction/1, ?MODULE, [ShovelName, ShovelConfig]). init([ShovelName, Config]) -> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-stomp/src/rabbit_stomp_client_sup.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-stomp/src/rabbit_stomp_client_sup.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-stomp/src/rabbit_stomp_client_sup.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-stomp/src/rabbit_stomp_client_sup.erl 2014-03-03 17:22:14.000000000 +0100 @@ -22,6 +22,12 @@ start_link(Configuration) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, HelperPid} = + supervisor2:start_child(SupPid, + {rabbit_stomp_heartbeat_sup, + {rabbit_connection_helper_sup, start_link, []}, + intrinsic, infinity, supervisor, + [rabbit_connection_helper_sup]}), %% The processor is intrinsic. When it exits, the supervisor goes too. {ok, ProcessorPid} = supervisor2:start_child(SupPid, @@ -38,7 +44,7 @@ SupPid, {rabbit_stomp_reader, {rabbit_stomp_reader, - start_link, [SupPid, ProcessorPid, Configuration]}, + start_link, [HelperPid, ProcessorPid, Configuration]}, transient, ?MAX_WAIT, worker, [rabbit_stomp_reader]}), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-stomp/src/rabbit_stomp_reader.erl new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-stomp/src/rabbit_stomp_reader.erl --- old/rabbitmq-server-3.2.3/plugins-src/rabbitmq-stomp/src/rabbit_stomp_reader.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/plugins-src/rabbitmq-stomp/src/rabbit_stomp_reader.erl 2014-03-03 17:22:14.000000000 +0100 @@ -29,24 +29,24 @@ %%---------------------------------------------------------------------------- -start_link(SupPid, ProcessorPid, Configuration) -> +start_link(SupHelperPid, ProcessorPid, Configuration) -> {ok, proc_lib:spawn_link(?MODULE, init, - [SupPid, ProcessorPid, Configuration])}. + [SupHelperPid, ProcessorPid, Configuration])}. log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). -init(SupPid, ProcessorPid, Configuration) -> - Reply = go(SupPid, ProcessorPid, Configuration), +init(SupHelperPid, ProcessorPid, Configuration) -> + Reply = go(SupHelperPid, ProcessorPid, Configuration), rabbit_stomp_processor:flush_and_die(ProcessorPid), Reply. -go(SupPid, ProcessorPid, Configuration) -> +go(SupHelperPid, ProcessorPid, Configuration) -> receive {go, Sock0, SockTransform} -> {ok, Sock} = SockTransform(Sock0), case rabbit_net:connection_string(Sock, inbound) of {ok, ConnStr} -> - ProcInitArgs = processor_args(SupPid, Configuration, Sock), + ProcInitArgs = processor_args(SupHelperPid, Configuration, Sock), rabbit_stomp_processor:init_arg(ProcessorPid, ProcInitArgs), log(info, "accepting STOMP connection ~p (~s)~n", [self(), ConnStr]), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/src/mirrored_supervisor.erl new/rabbitmq-server-3.2.4/src/mirrored_supervisor.erl --- old/rabbitmq-server-3.2.3/src/mirrored_supervisor.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/src/mirrored_supervisor.erl 2014-03-03 17:22:14.000000000 +0100 @@ -57,10 +57,10 @@ %% This is basically the same as for supervisor, except that: %% %% 1) start_link(Module, Args) becomes -%% start_link(Group, Module, Args). +%% start_link(Group, TxFun, Module, Args). %% %% 2) start_link({local, Name}, Module, Args) becomes -%% start_link({local, Name}, Group, Module, Args). +%% start_link({local, Name}, Group, TxFun, Module, Args). %% %% 3) start_link({global, Name}, Module, Args) is not available. %% @@ -70,6 +70,19 @@ %% application should invoke create_tables() (or table_definitions() %% if it wants to manage table creation itself). %% +%% The TxFun parameter to start_link/{4,5} is a function which the +%% mirrored supervisor can use to execute Mnesia transactions. In the +%% RabbitMQ server this goes via a worker pool; in other cases a +%% function like: +%% +%% tx_fun(Fun) -> +%% case mnesia:sync_transaction(Fun) of +%% {atomic, Result} -> Result; +%% {aborted, Reason} -> throw({error, Reason}) +%% end. +%% +%% could be used. +%% %% Internals %% --------- %% @@ -115,7 +128,7 @@ {attributes, record_info(fields, mirrored_sup_childspec)}]}). -define(TABLE_MATCH, {match, #mirrored_sup_childspec{ _ = '_' }}). --export([start_link/3, start_link/4, +-export([start_link/4, start_link/5, start_child/2, restart_child/2, delete_child/2, terminate_child/2, which_children/1, count_children/1, check_childspecs/1]). @@ -126,7 +139,7 @@ -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3, handle_cast/2]). --export([start_internal/2]). +-export([start_internal/3]). -export([create_tables/0, table_definitions/0]). -record(mirrored_sup_childspec, {key, mirroring_pid, childspec}). @@ -134,6 +147,7 @@ -record(state, {overall, delegate, group, + tx_fun, initial_childspecs}). %%---------------------------------------------------------------------------- @@ -160,19 +174,25 @@ -type group_name() :: any(). --spec start_link(GroupName, Module, Args) -> startlink_ret() when +-type(tx_fun() :: fun((fun(() -> A)) -> A)). + +-spec start_link(GroupName, TxFun, Module, Args) -> startlink_ret() when GroupName :: group_name(), + TxFun :: tx_fun(), Module :: module(), Args :: term(). --spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when +-spec start_link(SupName, GroupName, TxFun, Module, Args) -> + startlink_ret() when SupName :: supervisor2:sup_name(), GroupName :: group_name(), + TxFun :: tx_fun(), Module :: module(), Args :: term(). --spec start_internal(Group, ChildSpecs) -> Result when +-spec start_internal(Group, TxFun, ChildSpecs) -> Result when Group :: group_name(), + TxFun :: tx_fun(), ChildSpecs :: [supervisor2:child_spec()], Result :: {'ok', pid()} | {'error', term()}. @@ -190,18 +210,18 @@ %%---------------------------------------------------------------------------- -start_link(Group, Mod, Args) -> - start_link0([], Group, init(Mod, Args)). +start_link(Group, TxFun, Mod, Args) -> + start_link0([], Group, TxFun, init(Mod, Args)). -start_link({local, SupName}, Group, Mod, Args) -> - start_link0([{local, SupName}], Group, init(Mod, Args)); +start_link({local, SupName}, Group, TxFun, Mod, Args) -> + start_link0([{local, SupName}], Group, TxFun, init(Mod, Args)); -start_link({global, _SupName}, _Group, _Mod, _Args) -> +start_link({global, _SupName}, _Group, _TxFun, _Mod, _Args) -> erlang:error(badarg). -start_link0(Prefix, Group, Init) -> +start_link0(Prefix, Group, TxFun, Init) -> case apply(?SUPERVISOR, start_link, - Prefix ++ [?MODULE, {overall, Group, Init}]) of + Prefix ++ [?MODULE, {overall, Group, TxFun, Init}]) of {ok, Pid} -> case catch call(Pid, {init, Pid}) of ok -> {ok, Pid}; E -> E @@ -225,7 +245,9 @@ check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity). -cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg). +cast(Sup, Msg) -> with_exit_handler( + fun() -> ok end, + fun() -> ?GEN_SERVER:cast(mirroring(Sup), Msg) end). find_call(Sup, Id, Msg) -> Group = call(Sup, group), @@ -257,14 +279,14 @@ %%---------------------------------------------------------------------------- -start_internal(Group, ChildSpecs) -> - ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, ChildSpecs}, +start_internal(Group, TxFun, ChildSpecs) -> + ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, TxFun, ChildSpecs}, [{timeout, infinity}]). %%---------------------------------------------------------------------------- -init({overall, _Group, ignore}) -> ignore; -init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> +init({overall, _Group, _TxFun, ignore}) -> ignore; +init({overall, Group, TxFun, {ok, {Restart, ChildSpecs}}}) -> %% Important: Delegate MUST start before Mirroring so that when we %% shut down from above it shuts down last, so Mirroring does not %% see it die. @@ -273,27 +295,30 @@ {ok, {{one_for_all, 0, 1}, [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, - {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, + {mirroring, {?MODULE, start_internal, [Group, TxFun, ChildSpecs]}, permanent, 16#ffffffff, worker, [?MODULE]}]}}; init({delegate, Restart}) -> {ok, {Restart, []}}; -init({mirroring, Group, ChildSpecs}) -> - {ok, #state{group = Group, initial_childspecs = ChildSpecs}}. +init({mirroring, Group, TxFun, ChildSpecs}) -> + {ok, #state{group = Group, + tx_fun = TxFun, + initial_childspecs = ChildSpecs}}. handle_call({init, Overall}, _From, State = #state{overall = undefined, delegate = undefined, group = Group, + tx_fun = TxFun, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), ?PG2:create(Group), ok = ?PG2:join(Group, Overall), Rest = ?PG2:get_members(Group) -- [Overall], case Rest of - [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end); + [] -> TxFun(fun() -> delete_all(Group) end); _ -> ok end, [begin @@ -303,7 +328,8 @@ Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of + case errors([maybe_start(Group, TxFun, Overall, Delegate, S) + || S <- ChildSpecs]) of [] -> {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; @@ -311,16 +337,18 @@ handle_call({start_child, ChildSpec}, _From, State = #state{overall = Overall, delegate = Delegate, - group = Group}) -> - {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of + group = Group, + tx_fun = TxFun}) -> + {reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of already_in_mnesia -> {error, already_present}; {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; Else -> Else end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, - group = Group}) -> - {reply, stop(Group, Delegate, Id), State}; + group = Group, + tx_fun = TxFun}) -> + {reply, stop(Group, TxFun, Delegate, Id), State}; handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; @@ -343,7 +371,7 @@ {stop, {unexpected_cast, Msg}, State}. handle_info({'DOWN', _Ref, process, Pid, Reason}, - State = #state{overall = Pid, group = Group}) -> + State = #state{delegate = Pid, group = Group}) -> %% Since the delegate is temporary, its death won't cause us to %% die. Since the overall supervisor kills processes in reverse %% order when shutting down "from above" and we started after the @@ -357,14 +385,15 @@ {stop, Reason, State}; handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{delegate = Delegate, group = Group, - overall = O}) -> + State = #state{delegate = Delegate, + group = Group, + tx_fun = TxFun, + overall = O}) -> %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [O | _] -> {atomic, ChildSpecs} = - mnesia:transaction( - fun() -> update_all(O, Pid) end), + [O | _] -> ChildSpecs = + TxFun(fun() -> update_all(O, Pid) end), [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; _ -> [] end, @@ -387,14 +416,14 @@ tell_all_peers_to_die(Group, Reason) -> [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]]. -maybe_start(Group, Overall, Delegate, ChildSpec) -> - case mnesia:transaction( - fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of - {atomic, start} -> start(Delegate, ChildSpec); - {atomic, undefined} -> already_in_mnesia; - {atomic, Pid} -> {already_in_mnesia, Pid}; +maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) -> + try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of + start -> start(Delegate, ChildSpec); + undefined -> already_in_mnesia; + Pid -> {already_in_mnesia, Pid} + catch %% If we are torn down while in the transaction... - {aborted, E} -> {error, E} + {error, E} -> {error, E} end. check_start(Group, Overall, Delegate, ChildSpec) -> @@ -429,11 +458,12 @@ start(Delegate, ChildSpec) -> apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). -stop(Group, Delegate, Id) -> - case mnesia:transaction(fun() -> check_stop(Group, Delegate, Id) end) of - {atomic, deleted} -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); - {atomic, running} -> {error, running}; - {aborted, E} -> {error, E} +stop(Group, TxFun, Delegate, Id) -> + try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of + deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); + running -> {error, running} + catch + {error, E} -> {error, E} end. check_stop(Group, Delegate, Id) -> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/src/mirrored_supervisor_tests.erl new/rabbitmq-server-3.2.4/src/mirrored_supervisor_tests.erl --- old/rabbitmq-server-3.2.3/src/mirrored_supervisor_tests.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/src/mirrored_supervisor_tests.erl 2014-03-03 17:22:14.000000000 +0100 @@ -175,14 +175,14 @@ test_unsupported() -> try - ?MS:start_link({global, foo}, get_group(group), ?MODULE, + ?MS:start_link({global, foo}, get_group(group), fun tx_fun/1, ?MODULE, {sup, one_for_one, []}), exit(no_global) catch error:badarg -> ok end, try - ?MS:start_link({local, foo}, get_group(group), ?MODULE, + ?MS:start_link({local, foo}, get_group(group), fun tx_fun/1, ?MODULE, {sup, simple_one_for_one, []}), exit(no_sofo) catch error:badarg -> @@ -192,7 +192,7 @@ %% Just test we don't blow up test_ignore() -> - ?MS:start_link({local, foo}, get_group(group), ?MODULE, + ?MS:start_link({local, foo}, get_group(group), fun tx_fun/1, ?MODULE, {sup, fake_strategy_for_ignore, []}), passed. @@ -202,7 +202,7 @@ test_startup_failure(Fail) -> process_flag(trap_exit, true), - ?MS:start_link(get_group(group), ?MODULE, + ?MS:start_link(get_group(group), fun tx_fun/1, ?MODULE, {sup, one_for_one, [childspec(Fail)]}), receive {'EXIT', _, shutdown} -> @@ -236,10 +236,11 @@ start_sup({Name, []}, Group). start_sup0(anon, Group, ChildSpecs) -> - ?MS:start_link(Group, ?MODULE, {sup, one_for_one, ChildSpecs}); + ?MS:start_link(Group, fun tx_fun/1, ?MODULE, + {sup, one_for_one, ChildSpecs}); start_sup0(Name, Group, ChildSpecs) -> - ?MS:start_link({local, Name}, Group, ?MODULE, + ?MS:start_link({local, Name}, Group, fun tx_fun/1, ?MODULE, {sup, one_for_one, ChildSpecs}). childspec(Id) -> @@ -258,6 +259,12 @@ {received, Pid, ping} = call(Id, ping), Pid. +tx_fun(Fun) -> + case mnesia:sync_transaction(Fun) of + {atomic, Result} -> Result; + {aborted, Reason} -> throw({error, Reason}) + end. + inc_group() -> Count = case get(counter) of undefined -> 0; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/src/rabbit_autoheal.erl new/rabbitmq-server-3.2.4/src/rabbit_autoheal.erl --- old/rabbitmq-server-3.2.3/src/rabbit_autoheal.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/src/rabbit_autoheal.erl 2014-03-03 17:22:14.000000000 +0100 @@ -75,10 +75,16 @@ enabled() -> {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling). -node_down(_Node, {winner_waiting, _Nodes, _Notify} = Autoheal) -> - Autoheal; node_down(_Node, not_healing) -> not_healing; + +node_down(Node, {winner_waiting, _, Notify}) -> + rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), + %% Make sure any nodes waiting for us start - it won't necessarily + %% heal the partition but at least they won't get stuck. + notify_safe(Notify), + not_healing; + node_down(Node, _State) -> rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), not_healing. @@ -144,7 +150,7 @@ handle_msg({node_stopped, Node}, {winner_waiting, [Node], Notify}, _Partitions) -> rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), - [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], + notify_safe(Notify), not_healing; handle_msg({node_stopped, Node}, @@ -163,6 +169,9 @@ send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}. +notify_safe(Notify) -> + [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify]. + make_decision(AllPartitions) -> Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/src/rabbit_binding.erl new/rabbitmq-server-3.2.4/src/rabbit_binding.erl --- old/rabbitmq-server-3.2.3/src/rabbit_binding.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/src/rabbit_binding.erl 2014-03-03 17:22:14.000000000 +0100 @@ -200,13 +200,15 @@ binding_action( Binding, fun (Src, Dst, B) -> - case mnesia:read(rabbit_route, B, write) =:= [] andalso - mnesia:read(rabbit_durable_route, B, write) =/= [] of - true -> rabbit_misc:const({error, binding_not_found}); - false -> case InnerFun(Src, Dst) of - ok -> remove(Src, Dst, B); - {error, _} = Err -> rabbit_misc:const(Err) - end + case mnesia:read(rabbit_route, B, write) of + [] -> case mnesia:read(rabbit_durable_route, B, write) of + [] -> rabbit_misc:const(ok); + _ -> rabbit_misc:const({error, binding_not_found}) + end; + _ -> case InnerFun(Src, Dst) of + ok -> remove(Src, Dst, B); + {error, _} = Err -> rabbit_misc:const(Err) + end end end, fun absent_errs_only/1). diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/src/rabbit_node_monitor.erl new/rabbitmq-server-3.2.4/src/rabbit_node_monitor.erl --- old/rabbitmq-server-3.2.3/src/rabbit_node_monitor.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/src/rabbit_node_monitor.erl 2014-03-03 17:22:14.000000000 +0100 @@ -268,8 +268,7 @@ {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; handle_info({nodedown, Node}, State) -> - ok = handle_dead_node(Node), - {noreply, State}; + {noreply, handle_dead_node(Node, State)}; handle_info({mnesia_system_event, {inconsistent_database, running_partitioned_network, Node}}, @@ -341,7 +340,7 @@ ok = rabbit_mnesia:on_node_down(Node), ok. -handle_dead_node(_Node) -> +handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> %% In general in rabbit_node_monitor we care about whether the %% rabbit application is up rather than the node; we do this so %% that we can respond in the same way to "rabbitmqctl stop_app" @@ -354,17 +353,17 @@ case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> case majority() of - true -> ok; - false -> await_cluster_recovery() + true -> State; + false -> await_cluster_recovery() %% Does not really return end; {ok, ignore} -> - ok; + State; {ok, autoheal} -> - ok; + State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)}; {ok, Term} -> rabbit_log:warning("cluster_partition_handling ~p unrecognised, " "assuming 'ignore'~n", [Term]), - ok + State end. await_cluster_recovery() -> @@ -397,8 +396,7 @@ wait_for_cluster_recovery(Nodes) end. -handle_dead_rabbit_state(Node, State = #state{partitions = Partitions, - autoheal = Autoheal}) -> +handle_dead_rabbit_state(_Node, State = #state{partitions = Partitions}) -> %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -408,9 +406,8 @@ [] -> []; _ -> Partitions end, - ensure_ping_timer( - State#state{partitions = Partitions1, - autoheal = rabbit_autoheal:node_down(Node, Autoheal)}). + ensure_ping_timer(State#state{partitions = Partitions1}). + ensure_ping_timer(State) -> rabbit_misc:ensure_timer( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/src/rabbit_reader.erl new/rabbitmq-server-3.2.4/src/rabbit_reader.erl --- old/rabbitmq-server-3.2.3/src/rabbit_reader.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/src/rabbit_reader.erl 2014-03-03 17:22:14.000000000 +0100 @@ -44,8 +44,7 @@ client_properties, capabilities, auth_mechanism, auth_state}). --record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, - blocked_sent}). +-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -243,8 +242,7 @@ throttle = #throttle{ alarmed_by = [], last_blocked_by = none, - last_blocked_at = never, - blocked_sent = false}}, + last_blocked_at = never}}, try run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( @@ -323,14 +321,19 @@ throw({inet_error, Reason}). handle_other({conserve_resources, Source, Conserve}, - State = #v1{throttle = Throttle = - #throttle{alarmed_by = CR}}) -> + State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> CR1 = case Conserve of true -> lists:usort([Source | CR]); false -> CR -- [Source] end, - Throttle1 = Throttle#throttle{alarmed_by = CR1}, - control_throttle(State#v1{throttle = Throttle1}); + State1 = control_throttle( + State#v1{throttle = Throttle#throttle{alarmed_by = CR1}}), + case {blocked_by_alarm(State), blocked_by_alarm(State1)} of + {false, true} -> ok = send_blocked(State1); + {true, false} -> ok = send_unblocked(State1); + {_, _} -> ok + end, + State1; handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -422,10 +425,7 @@ {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - maybe_send_unblocked(State), - State#v1{connection_state = running, - throttle = Throttle#throttle{ - blocked_sent = false}}; + State#v1{connection_state = running}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State @@ -434,37 +434,49 @@ maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), - Sent = maybe_send_blocked(State), - State#v1{connection_state = blocked, - throttle = update_last_blocked_by( - Throttle#throttle{last_blocked_at = erlang:now(), - blocked_sent = Sent})}; + State1 = State#v1{connection_state = blocked, + throttle = update_last_blocked_by( + Throttle#throttle{ + last_blocked_at = erlang:now()})}, + case {blocked_by_alarm(State), blocked_by_alarm(State1)} of + {false, true} -> ok = send_blocked(State1); + {_, _} -> ok + end, + State1; maybe_block(State) -> State. -maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) -> - false; -maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, - connection = #connection{ - protocol = Protocol, - capabilities = Capabilities}, - sock = Sock}) -> + +blocked_by_alarm(#v1{connection_state = blocked, + throttle = #throttle{alarmed_by = CR}}) + when CR =/= [] -> + true; +blocked_by_alarm(#v1{}) -> + false. + +send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, + connection = #connection{protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> RStr = string:join([atom_to_list(A) || A <- CR], " & "), Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, - Protocol), - true; + Protocol); _ -> - false + ok end. -maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> - ok; -maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, - sock = Sock}) -> - ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). +send_unblocked(#v1{connection = #connection{protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> + case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of + {bool, true} -> + ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol); + _ -> + ok + end. update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> Throttle#throttle{last_blocked_by = flow}; diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/src/rabbit_tests.erl new/rabbitmq-server-3.2.4/src/rabbit_tests.erl --- old/rabbitmq-server-3.2.3/src/rabbit_tests.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/src/rabbit_tests.erl 2014-03-03 17:22:14.000000000 +0100 @@ -1042,6 +1042,9 @@ ok = control_action(add_vhost, ["/testhost"]), ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], [{"-p", "/testhost"}]), + {new, _} = rabbit_amqqueue:declare( + rabbit_misc:r(<<"/testhost">>, queue, <<"test">>), + true, false, [], none), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/src/rabbit_vhost.erl new/rabbitmq-server-3.2.4/src/rabbit_vhost.erl --- old/rabbitmq-server-3.2.3/src/rabbit_vhost.erl 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/src/rabbit_vhost.erl 2014-03-03 17:22:14.000000000 +0100 @@ -81,9 +81,9 @@ %% eventually the termination of that process. Exchange deletion causes %% notifications which must be sent outside the TX rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]), - [{ok,_} = rabbit_amqqueue:delete(Q, false, false) || + [assert_benign(rabbit_amqqueue:delete(Q, false, false)) || Q <- rabbit_amqqueue:list(VHostPath)], - [ok = rabbit_exchange:delete(Name, false) || + [assert_benign(rabbit_exchange:delete(Name, false)) || #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], R = rabbit_misc:execute_mnesia_transaction( with(VHostPath, fun () -> @@ -92,6 +92,18 @@ ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), R. +assert_benign(ok) -> ok; +assert_benign({ok, _}) -> ok; +assert_benign({error, not_found}) -> ok; +assert_benign({error, {absent, Q}}) -> + %% We have a durable queue on a down node. Removing the mnesia + %% entries here is safe. If/when the down node restarts, it will + %% clear out the on-disk storage of the queue. + case rabbit_amqqueue:internal_delete(Q#amqqueue.name) of + ok -> ok; + {error, not_found} -> ok + end. + internal_delete(VHostPath) -> [ok = rabbit_auth_backend_internal:clear_permissions( proplists:get_value(user, Info), VHostPath) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/rabbitmq-server-3.2.3/version.mk new/rabbitmq-server-3.2.4/version.mk --- old/rabbitmq-server-3.2.3/version.mk 2014-01-23 15:57:34.000000000 +0100 +++ new/rabbitmq-server-3.2.4/version.mk 2014-03-03 17:22:14.000000000 +0100 @@ -1 +1 @@ -VERSION?=3.2.3 +VERSION?=3.2.4 -- To unsubscribe, e-mail: opensuse-commit+unsubscr...@opensuse.org For additional commands, e-mail: opensuse-commit+h...@opensuse.org