This plugin takes an entirely different approach to dealing with ActiveMQ
from the more generic stomp connector.

 - Agents all use /topic/<collective>.<agent>.agent
 - Replies use temp-topics so they are private and transient.
 - Point to Point messages using topics are supported by subscribing to
   /topic/<collective>.nodes with a selector "mc_identity = 'identity'

The use of temp-topics for the replies is a huge improvement over the old style.
In the old way all clients got replies for all clients that were active at that
time, this would mean that they would need to decrypt, validate etc in order to
determine if they need to ignore the message, this was computationaly expensive 
and
on large busy networks the messages were being sent all over the show cross 
broker
boundaries.

The new way means the messages go point2point back to only whoever requested the
message, they only get their own replies and this is ap private channel that
casual observers cannot just snoop into.

It only supports ActiveMQ 5.5.0 or newer and Stomp gems newer than 1.1.6

Signed-off-by: R.I.Pienaar <r...@devco.net>
---
Local-branch: feature/master/7899
 plugins/mcollective/connector/activemq.rb          |  255 +++++++++++++
 .../plugins/mcollective/connector/activemq_spec.rb |  384 ++++++++++++++++++++
 website/changelog.md                               |    1 +
 website/reference/index.md                         |    2 +
 website/reference/plugins/connector_activemq.md    |   77 ++++
 5 files changed, 719 insertions(+), 0 deletions(-)
 create mode 100644 plugins/mcollective/connector/activemq.rb
 create mode 100644 spec/unit/plugins/mcollective/connector/activemq_spec.rb
 create mode 100644 website/reference/plugins/connector_activemq.md

diff --git a/plugins/mcollective/connector/activemq.rb 
b/plugins/mcollective/connector/activemq.rb
new file mode 100644
index 0000000..675d94e
--- /dev/null
+++ b/plugins/mcollective/connector/activemq.rb
@@ -0,0 +1,255 @@
+require 'stomp'
+
+module MCollective
+    module Connector
+        # Handles sending and receiving messages over the Stomp protocol for 
ActiveMQ
+        # servers specifically, we take advantages of ActiveMQ specific 
features and
+        # enhancements to the Stomp protocol.  For best results in a clustered 
environment
+        # use ActiveMQ 5.5.0 at least.
+        #
+        # This plugin takes an entirely different approach to dealing with 
ActiveMQ
+        # from the more generic stomp connector.
+        #
+        #  - Agents use /topic/<collective>.<agent>.agent
+        #  - Replies use temp-topics so they are private and transient.
+        #  - Point to Point messages using topics are supported by subscribing 
to
+        #    /topic/<collective>.nodes with a selector "mc_identity = 
'identity'
+        #
+        # The use of temp-topics for the replies is a huge improvement over 
the old style.
+        # In the old way all clients got replies for all clients that were 
active at that
+        # time, this would mean that they would need to decrypt, validate etc 
in order to
+        # determine if they need to ignore the message, this was 
computationally expensive
+        # and on large busy networks the messages were being sent all over the 
show cross
+        # broker boundaries.
+        #
+        # The new way means the messages go point2point back to only whoever 
requested the
+        # message, they only get their own replies and this is ap private 
channel that
+        # casual observers cannot just snoop into.
+        #
+        # This plugin supports 1.1.6 and newer of the Stomp rubygem
+        #
+        #    connector = activemq
+        #    plugin.activemq.pool.size = 2
+        #
+        #    plugin.activemq.pool.1.host = stomp1.your.net
+        #    plugin.activemq.pool.1.port = 6163
+        #    plugin.activemq.pool.1.user = you
+        #    plugin.activemq.pool.1.password = secret
+        #    plugin.activemq.pool.1.ssl = true
+        #
+        #    plugin.activemq.pool.2.host = stomp2.your.net
+        #    plugin.activemq.pool.2.port = 6163
+        #    plugin.activemq.pool.2.user = you
+        #    plugin.activemq.pool.2.password = secret
+        #    plugin.activemq.pool.2.ssl = false
+        #
+        # Using this method you can supply just STOMP_USER and STOMP_PASSWORD. 
 The port will
+        # default to 61613 if not specified.
+        #
+        # In addition you can set the following options for the rubygem:
+        #
+        #     plugin.activemq.initial_reconnect_delay = 0.01
+        #     plugin.activemq.max_reconnect_delay = 30.0
+        #     plugin.activemq.use_exponential_back_off = true
+        #     plugin.activemq.back_off_multiplier = 2
+        #     plugin.activemq.max_reconnect_attempts = 0
+        #     plugin.activemq.randomize = false
+        #     plugin.activemq.timeout = -1
+        #
+        # ActiveMQ JMS message priorities can be set:
+        #
+        #     plugin.activemq.priority = 4
+        #
+        class Activemq<Base
+            attr_reader :connection
+
+            def initialize
+                @config = Config.instance
+                @subscriptions = []
+                @msgpriority = 0
+                @base64 = false
+            end
+
+            # Connects to the ActiveMQ middleware
+            def connect(connector = ::Stomp::Connection)
+                if @connection
+                    Log.debug("Already connection, not re-initializing 
connection")
+                    return
+                end
+
+                begin
+                    @base64 = get_bool_option("activemq.base64", false)
+                    @msgpriority = get_option("activemq.priority", 0).to_i
+
+                    pools = @config.pluginconf["activemq.pool.size"].to_i
+                    hosts = []
+
+                    1.upto(pools) do |poolnum|
+                        host = {}
+
+                        host[:host] = 
get_option("activemq.pool.#{poolnum}.host")
+                        host[:port] = 
get_option("activemq.pool.#{poolnum}.port", 6163).to_i
+                        host[:login] = get_env_or_option("STOMP_USER", 
"activemq.pool.#{poolnum}.user")
+                        host[:passcode] = get_env_or_option("STOMP_PASSWORD", 
"activemq.pool.#{poolnum}.password")
+                        host[:ssl] = 
get_bool_option("activemq.pool.#{poolnum}.ssl", false)
+
+                        Log.debug("Adding #{host[:host]}:#{host[:port]} to the 
connection pool")
+                        hosts << host
+                    end
+
+                    raise "No hosts found for the ActiveMQ connection pool" if 
hosts.size == 0
+
+                    connection = {:hosts => hosts}
+
+                    # Various STOMP gem options, defaults here matches 
defaults for 1.1.6 the meaning of
+                    # these can be guessed, the documentation isn't clear
+                    connection[:initial_reconnect_delay] = 
get_option("activemq.initial_reconnect_delay", 0.01).to_f
+                    connection[:max_reconnect_delay] = 
get_option("activemq.max_reconnect_delay", 30.0).to_f
+                    connection[:use_exponential_back_off] = 
get_bool_option("activemq.use_exponential_back_off", true)
+                    connection[:back_off_multiplier] = 
get_bool_option("activemq.back_off_multiplier", 2).to_i
+                    connection[:max_reconnect_attempts] = 
get_option("activemq.max_reconnect_attempts", 0).to_i
+                    connection[:randomize] = 
get_bool_option("activemq.randomize", false)
+                    connection[:backup] = get_bool_option("activemq.backup", 
false)
+                    connection[:timeout] = get_option("activemq.timeout", 
-1).to_i
+
+                    @connection = connector.new(connection)
+                rescue Exception => e
+                    raise("Could not connect to ActiveMQ Server: #{e}")
+                end
+            end
+
+            # Receives a message from the ActiveMQ connection
+            def receive
+                Log.debug("Waiting for a message from ActiveMQ")
+                msg = @connection.receive
+
+                Message.new(msg.body, msg, :base64 => @base64, :headers => 
msg.headers)
+            end
+
+            # Sends a message to the ActiveMQ connection
+            def publish(msg)
+                msg.base64_encode! if @base64
+
+                target = target_for(msg)
+                target[:headers].merge!(headers_for(msg))
+
+                Log.debug("Sending a message to ActiveMQ target 
'#{target[:name]}' with headers '#{target[:headers].keys}'")
+
+                @connection.publish(target[:name], msg.payload, 
target[:headers])
+            end
+
+            # Subscribe to a topic or queue
+            def subscribe(agent, type, collective)
+                source = make_target(agent, type, collective)
+
+                unless @subscriptions.include?(source)
+                    Log.debug("Subscribing to #{source[:name]} with headers 
#{source[:headers].keys}")
+                    @connection.subscribe(source[:name], source[:headers])
+                    @subscriptions << source
+                end
+            end
+
+            # Subscribe to a topic or queue
+            def unsubscribe(agent, type, collective)
+                source = make_target(agent, type, collective)
+
+                Log.debug("Unsubscribing from #{source[:name]}")
+                @connection.unsubscribe(source[:name], source[:headers])
+                @subscriptions.delete(source)
+            end
+
+            def target_for(msg)
+                if msg.type == :reply
+                    target = {:name => msg.request.headers["reply-to"], 
:headers => {}}
+                elsif msg.type == :request
+                    target = make_target(msg.agent, :request, msg.collective)
+                else
+                    raise "Don't now how to create a target for message type 
#{msg.type}"
+                end
+
+                return target
+            end
+
+            # Disconnects from the ActiveMQ connection
+            def disconnect
+                Log.debug("Disconnecting from ActiveMQ")
+                @connection.disconnect
+            end
+
+            def headers_for(msg)
+                headers = {}
+                headers = {"priority" => @msgpriority} if @msgpriority > 0
+
+                if msg.type == :request
+                    target = make_target(msg.agent, :reply, msg.collective)
+                    headers["reply-to"] = target[:name]
+                end
+
+                return headers
+            end
+
+            def make_target(agent, type, collective)
+                raise("Unknown target type #{type}") unless [:directed, 
:broadcast, :reply, :request].include?(type)
+                raise("Unknown collective '#{collective}' known collectives 
are '#{@config.collectives.join ', '}'") unless 
@config.collectives.include?(collective)
+
+                target = {:name => nil, :headers => {}}
+
+                case type
+                    when :reply
+                        target[:name] = ["/temp-topic/" + collective, 
:reply].join(".")
+
+                    when :broadcast
+                        target[:name] = ["/topic/" + collective, :agent, 
agent].join(".")
+
+                    when :request
+                        target[:name] = ["/topic/" + collective, :agent, 
agent].join(".")
+
+                    when :directed
+                        target[:name] = ["/topic/" + collective, 
:nodes].join(".")
+                        target[:headers]["selector"] = "mc_identity = 
'#{@config.identity}'"
+                end
+
+                target
+            end
+
+            # looks in the environment first then in the config file
+            # for a specific option, accepts an optional default.
+            #
+            # raises an exception when it cant find a value anywhere
+            def get_env_or_option(env, opt, default=nil)
+                return ENV[env] if ENV.include?(env)
+                return @config.pluginconf[opt] if 
@config.pluginconf.include?(opt)
+                return default if default
+
+                raise("No #{env} environment or plugin.#{opt} configuration 
option given")
+            end
+
+            # looks for a config option, accepts an optional default
+            #
+            # raises an exception when it cant find a value anywhere
+            def get_option(opt, default=nil)
+                return @config.pluginconf[opt] if 
@config.pluginconf.include?(opt)
+                return default if default
+
+                raise("No plugin.#{opt} configuration option given")
+            end
+
+            # gets a boolean option from the config, supports 
y/n/true/false/1/0
+            def get_bool_option(opt, default)
+                return default unless @config.pluginconf.include?(opt)
+
+                val = @config.pluginconf[opt]
+
+                if val =~ /^1|yes|true/
+                    return true
+                elsif val =~ /^0|no|false/
+                    return false
+                else
+                    return default
+                end
+            end
+        end
+    end
+end
+
+# vi:tabstop=4:expandtab:ai
diff --git a/spec/unit/plugins/mcollective/connector/activemq_spec.rb 
b/spec/unit/plugins/mcollective/connector/activemq_spec.rb
new file mode 100644
index 0000000..e69caec
--- /dev/null
+++ b/spec/unit/plugins/mcollective/connector/activemq_spec.rb
@@ -0,0 +1,384 @@
+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../../../spec_helper'
+
+MCollective::PluginManager.clear
+
+require File.dirname(__FILE__) + 
'/../../../../../plugins/mcollective/connector/activemq.rb'
+
+module MCollective
+    module Connector
+        describe Activemq do
+            before do
+                @config = mock
+                @config.stubs(:configured).returns(true)
+                @config.stubs(:identity).returns("rspec")
+                @config.stubs(:collectives).returns(["mcollective"])
+
+                logger = mock
+                logger.stubs(:log)
+                logger.stubs(:start)
+                Log.configure(logger)
+
+                Config.stubs(:instance).returns(@config)
+
+                @msg = mock
+                @msg.stubs(:base64_encode!)
+                @msg.stubs(:payload).returns("msg")
+                @msg.stubs(:agent).returns("agent")
+                @msg.stubs(:type).returns(:reply)
+                @msg.stubs(:collective).returns("mcollective")
+
+                @subscription = mock
+                @subscription.stubs("<<").returns(true)
+                @subscription.stubs("include?").returns(false)
+                @subscription.stubs("delete").returns(false)
+
+                @connection = mock
+                @connection.stubs(:subscribe).returns(true)
+                @connection.stubs(:unsubscribe).returns(true)
+
+                @c = Activemq.new
+                @c.instance_variable_set("@subscriptions", @subscription)
+                @c.instance_variable_set("@connection", @connection)
+            end
+
+            describe "#initialize" do
+                it "should set the @config variable" do
+                    c = Activemq.new
+                    c.instance_variable_get("@config").should == @config
+                end
+
+                it "should set @subscriptions to an empty list" do
+                    c = Activemq.new
+                    c.instance_variable_get("@subscriptions").should == []
+                end
+            end
+
+            describe "#connect" do
+                it "should not try to reconnect if already connected" do
+                    Log.expects(:debug).with("Already connection, not 
re-initializing connection").once
+                    @c.connect
+                end
+
+                it "should support new style config" do
+                    pluginconf = {"activemq.pool.size" => "2",
+                                  "activemq.pool.1.host" => "host1",
+                                  "activemq.pool.1.port" => "6163",
+                                  "activemq.pool.1.user" => "user1",
+                                  "activemq.pool.1.password" => "password1",
+                                  "activemq.pool.1.ssl" => "false",
+                                  "activemq.pool.2.host" => "host2",
+                                  "activemq.pool.2.port" => "6164",
+                                  "activemq.pool.2.user" => "user2",
+                                  "activemq.pool.2.password" => "password2",
+                                  "activemq.pool.2.ssl" => "true",
+                                  "activemq.initial_reconnect_delay" => "0.02",
+                                  "activemq.max_reconnect_delay" => "40",
+                                  "activemq.use_exponential_back_off" => 
"false",
+                                  "activemq.back_off_multiplier" => "3",
+                                  "activemq.max_reconnect_attempts" => "5",
+                                  "activemq.randomize" => "true",
+                                  "activemq.backup" => "true",
+                                  "activemq.timeout" => "1"}
+
+
+                    ENV.delete("STOMP_USER")
+                    ENV.delete("STOMP_PASSWORD")
+
+                    
@config.expects(:pluginconf).returns(pluginconf).at_least_once
+
+                    connector = mock
+                    connector.expects(:new).with(:backup => true,
+                                                 :back_off_multiplier => 2,
+                                                 :max_reconnect_delay => 40.0,
+                                                 :timeout => 1,
+                                                 :use_exponential_back_off => 
false,
+                                                 :max_reconnect_attempts => 5,
+                                                 :initial_reconnect_delay => 
0.02,
+                                                 :randomize => true,
+                                                 :hosts => [{:passcode => 
'password1',
+                                                             :host => 'host1',
+                                                             :port => 6163,
+                                                             :ssl => false,
+                                                             :login => 
'user1'},
+                                                            {:passcode => 
'password2',
+                                                             :host => 'host2',
+                                                             :port => 6164,
+                                                             :ssl => true,
+                                                             :login => 'user2'}
+                                                           ])
+
+                    @c.instance_variable_set("@connection", nil)
+                    @c.connect(connector)
+                end
+            end
+
+            describe "#receive" do
+                it "should receive from the middleware" do
+                    payload = mock
+                    payload.stubs(:body).returns("msg")
+                    payload.stubs(:headers).returns("headers")
+
+                    @connection.expects(:receive).returns(payload)
+
+                    Message.expects(:new).with("msg", payload, :base64 => 
true, :headers => "headers").returns("message")
+                    @c.instance_variable_set("@base64", true)
+
+                    received = @c.receive
+                    received.should == "message"
+                end
+            end
+
+            describe "#publish" do
+                before do
+                    @connection.stubs(:publish).with("test", "msg", 
{}).returns(true)
+                end
+
+                it "should base64 encode a message if configured to do so" do
+                    @c.instance_variable_set("@base64", true)
+                    @c.expects(:headers_for).returns({})
+                    @c.expects(:target_for).returns({:name => "test", :headers 
=> {}})
+                    @connection.expects(:publish).with("test", "msg", {})
+                    @msg.expects(:base64_encode!)
+
+                    @c.publish(@msg)
+                end
+
+                it "should not base64 encode if not configured to do so" do
+                    @c.instance_variable_set("@base64", false)
+                    @c.expects(:headers_for).returns({})
+                    @c.expects(:target_for).returns({:name => "test", :headers 
=> {}})
+                    @connection.expects(:publish).with("test", "msg", {})
+                    @msg.expects(:base64_encode!).never
+
+                    @c.publish(@msg)
+                end
+
+                it "should publish the correct message to the correct target 
with msgheaders" do
+                    @connection.expects(:publish).with("test", "msg", {"test" 
=> "test"}).once
+                    @c.expects(:headers_for).returns({"test" => "test"})
+                    @c.expects(:target_for).returns({:name => "test", :headers 
=> {}})
+
+                    @c.publish(@msg)
+                end
+            end
+
+            describe "#subscribe" do
+                it "should use the make_target correctly" do
+                    @c.expects("make_target").with("test", :broadcast, 
"mcollective").returns({:name => "test", :headers => {}})
+                    @c.subscribe("test", :broadcast, "mcollective")
+                end
+
+                it "should check for existing subscriptions" do
+                    @c.expects("make_target").with("test", :broadcast, 
"mcollective").returns({:name => "test", :headers => {}})
+                    @subscription.expects("include?").with({:name => "test", 
:headers => {}}).returns(false)
+                    @connection.expects(:subscribe).never
+
+                    @c.subscribe("test", :broadcast, "mcollective")
+                end
+
+                it "should subscribe to the middleware" do
+                    @c.expects("make_target").with("test", :broadcast, 
"mcollective").returns({:name => "test", :headers => {}})
+                    @connection.expects(:subscribe).with("test", {})
+                    @c.subscribe("test", :broadcast, "mcollective")
+                end
+
+                it "should add to the list of subscriptions" do
+                    @c.expects("make_target").with("test", :broadcast, 
"mcollective").returns({:name => "test", :headers => {}})
+                    @subscription.expects("<<").with({:name => "test", 
:headers => {}})
+                    @c.subscribe("test", :broadcast, "mcollective")
+                end
+            end
+
+            describe "#unsubscribe" do
+                it "should use make_target correctly" do
+                    @c.expects("make_target").with("test", :broadcast, 
"mcollective").returns({:name => "test", :headers => {}})
+                    @c.unsubscribe("test", :broadcast, "mcollective")
+                end
+
+                it "should unsubscribe from the target" do
+                    @c.expects("make_target").with("test", :broadcast, 
"mcollective").returns({:name => "test", :headers => {}})
+                    @connection.expects(:unsubscribe).with("test", {}).once
+
+                    @c.unsubscribe("test", :broadcast, "mcollective")
+                end
+
+                it "should delete the source from subscriptions" do
+                    @c.expects("make_target").with("test", :broadcast, 
"mcollective").returns({:name => "test", :headers => {}})
+                    @subscription.expects(:delete).with({:name => "test", 
:headers => {}}).once
+
+                    @c.unsubscribe("test", :broadcast, "mcollective")
+                end
+            end
+
+            describe "#target_for" do
+                before do
+                end
+
+                it "should create reply targets based on reply-to headers in 
requests" do
+                    message = mock
+                    message.expects(:type).returns(:reply)
+
+                    request = mock
+                    request.expects(:headers).returns({"reply-to" => "foo"})
+
+                    message.expects(:request).returns(request)
+
+                    @c.target_for(message).should == {:name => "foo", :headers 
=> {}}
+                end
+
+                it "should create new request targets" do
+                    message = mock
+                    message.expects(:type).returns(:request).twice
+                    message.expects(:agent).returns("rspecagent")
+                    message.expects(:collective).returns("mcollective")
+
+                    @c.expects(:make_target).with("rspecagent", :request, 
"mcollective")
+                    @c.target_for(message)
+                end
+
+                it "should fail for unknown message types" do
+                    message = mock
+                    message.expects(:type).returns(:fail).times(3)
+
+                    expect {
+                        @c.target_for(message)
+                    }.to raise_error("Don't now how to create a target for 
message type fail")
+                end
+            end
+
+            describe "#disconnect" do
+                it "should disconnect from the stomp connection" do
+                    @connection.expects(:disconnect)
+                    @c.disconnect
+                end
+            end
+
+            describe "#headers_for" do
+                it "should return empty headers if priority is 0" do
+                    message = mock
+                    message.expects(:type).returns(:foo)
+
+                    @c.instance_variable_set("@msgpriority", 0)
+                    @c.headers_for(message).should == {}
+                end
+
+                it "should return a priority if prioritu is non 0" do
+                    message = mock
+                    message.expects(:type).returns(:foo)
+
+                    @c.instance_variable_set("@msgpriority", 1)
+                    @c.headers_for(message).should == {"priority" => 1}
+                end
+
+                it "should set a reply-to header for :request type messages" do
+                    message = mock
+                    message.expects(:type).returns(:request)
+                    message.expects(:agent).returns("rspecagent")
+                    message.expects(:collective).returns("mcollective")
+
+                    @c.instance_variable_set("@msgpriority", 0)
+                    @c.expects(:make_target).with("rspecagent", :reply, 
"mcollective").returns({:name => "test"})
+                    @c.headers_for(message).should == {"reply-to" => "test"}
+                end
+            end
+
+            describe "#make_target" do
+                it "should create correct targets" do
+                    @c.make_target("test", :reply, "mcollective").should == 
{:name => "/temp-topic/mcollective.reply", :headers => {}}
+                    @c.make_target("test", :broadcast, "mcollective").should 
== {:name => "/topic/mcollective.agent.test", :headers => {}}
+                    @c.make_target("test", :request, "mcollective").should == 
{:name => "/topic/mcollective.agent.test", :headers => {}}
+                    @c.make_target("test", :directed, "mcollective").should == 
{:name => "/topic/mcollective.nodes", :headers=>{"selector"=>"mc_identity = 
'rspec'"}}
+                end
+
+                it "should raise an error for unknown collectives" do
+                    expect {
+                        @c.make_target("test", :broadcast, "foo")
+                    }.to raise_error("Unknown collective 'foo' known 
collectives are 'mcollective'")
+                end
+
+                it "should raise an error for unknown types" do
+                    expect {
+                        @c.make_target("test", :test, "mcollective")
+                    }.to raise_error("Unknown target type test")
+                end
+            end
+
+
+            describe "#get_env_or_option" do
+                it "should return the environment variable if set" do
+                    ENV["test"] = "rspec_env_test"
+
+                    @c.get_env_or_option("test", nil, nil).should == 
"rspec_env_test"
+
+                    ENV.delete("test")
+                end
+
+                it "should return the config option if set" do
+                    @config.expects(:pluginconf).returns({"test" => 
"rspec_test"}).twice
+                    @c.get_env_or_option("test", "test", "test").should == 
"rspec_test"
+                end
+
+                it "should return default if nothing else matched" do
+                    @config.expects(:pluginconf).returns({}).once
+                    @c.get_env_or_option("test", "test", "test").should == 
"test"
+                end
+
+                it "should raise an error if no default is supplied" do
+                    @config.expects(:pluginconf).returns({}).once
+
+                    expect {
+                        @c.get_env_or_option("test", "test")
+                    }.to raise_error("No test environment or plugin.test 
configuration option given")
+                end
+            end
+
+            describe "#get_option" do
+                it "should return the config option if set" do
+                    @config.expects(:pluginconf).returns({"test" => 
"rspec_test"}).twice
+                    @c.get_option("test").should == "rspec_test"
+                end
+
+                it "should return default option was not found" do
+                    @config.expects(:pluginconf).returns({}).once
+                    @c.get_option("test", "test").should == "test"
+                end
+
+                it "should raise an error if no default is supplied" do
+                    @config.expects(:pluginconf).returns({}).once
+
+                    expect {
+                        @c.get_option("test")
+                    }.to raise_error("No plugin.test configuration option 
given")
+                end
+            end
+
+            describe "#get_bool_option" do
+                it "should return the default if option isnt set" do
+                    @config.expects(:pluginconf).returns({}).once
+                    @c.get_bool_option("test", "default").should == "default"
+                end
+
+                ["1", "yes", "true"].each do |boolean|
+                    it "should map options to true correctly" do
+                        @config.expects(:pluginconf).returns({"test" => 
boolean}).twice
+                        @c.get_bool_option("test", "default").should == true
+                    end
+                end
+
+                ["0", "no", "false"].each do |boolean|
+                    it "should map options to false correctly" do
+                        @config.expects(:pluginconf).returns({"test" => 
boolean}).twice
+                        @c.get_bool_option("test", "default").should == false
+                    end
+                end
+
+                it "should return default for non boolean options" do
+                        @config.expects(:pluginconf).returns({"test" => 
"foo"}).twice
+                        @c.get_bool_option("test", "default").should == 
"default"
+                end
+            end
+        end
+    end
+end
diff --git a/website/changelog.md b/website/changelog.md
index ee68e2e..b224e81 100644
--- a/website/changelog.md
+++ b/website/changelog.md
@@ -11,6 +11,7 @@ title: Changelog
 
 |Date|Description|Ticket|
 |----|-----------|------|
+|2011/06/14|Introduce an ActiveMQ optimized connector that uses the Stomp gem 
and ActiveMQ extensions|7899|
 |2011/06/12|Remove assumptions about middleware structure from the core and 
move it to the connector plugins|7619|
 |*2011/06/08*|*Release 1.3.0*|7796|
 |2011/06/07|Exceptions raised during option parsing were not handled and 
resulted in stack traces|7796|
diff --git a/website/reference/index.md b/website/reference/index.md
index f6ffcbf..b59a4c2 100644
--- a/website/reference/index.md
+++ b/website/reference/index.md
@@ -30,6 +30,7 @@ Index to basic users documentation.
  1. [ActiveMQ Security](integration/activemq_security.html)
  1. [ActiveMQ SSL](integration/activemq_ssl.html)
  1. [ActiveMQ Clusters](integration/activemq_clusters.html)
+ 1. [ActiveMQ Connector](plugins/connector_activemq.html)
 
 ### Plugins
 
@@ -40,6 +41,7 @@ Index to basic users documentation.
  1. [OpenSSL based Security Plugin](plugins/security_ssl.html)
  1. [Fact Source Plugins](plugins/facts.html)
  1. [Stomp Connector](plugins/connector_stomp.html)
+ 1. [ActiveMQ Connector](plugins/connector_activemq.html)
  1. [User Contributed 
Plugins](http://projects.puppetlabs.com/projects/mcollective-plugins/wiki)
 
 ### Development
diff --git a/website/reference/plugins/connector_activemq.md 
b/website/reference/plugins/connector_activemq.md
new file mode 100644
index 0000000..5250347
--- /dev/null
+++ b/website/reference/plugins/connector_activemq.md
@@ -0,0 +1,77 @@
+---
+layout: default
+title: ActiveMQ Connector
+disqus: true
+---
+[STOMP]: http://stomp.codehaus.org/
+
+# {{page.title}}
+
+The ActiveMQ connector uses the [STOMP] rubygem to connect to ActiveMQ 
servers.  It is specifically optimiszed for ActiveMQ
+and uses features in ActiveMQ 5.5.0 and later.
+
+This code will only work with version _1.1.6_ or newer of the Stomp gem.
+
+## Differences between ActiveMQ connector and Stomp Connector
+
+The ActiveMQ connector requires MCollective 1.3.1 or newer and introduce a new 
structure to the middleware messsages.
+
+ * Replies goes direct to clients using private temporary topics
+ * Agent topics are called _/topic/&lt;collective&gt;.agent.&lt;agent_name&gt;_
+ * Support for point to point messages are added by using 
_/topic/&lt;collective&gt;.nodes_ and using JMS selectors.
+
+The use of temp-topics mean that replies are now going to go back only to the 
person who sent the request.
+This has big impact on overall CPU usage by clients on busy networks but also 
optimize the traffic flow on
+networks with many brokers.
+
+Point to Point messages is a future that will be introduced later on, the 
approach using JMS Selectors means
+internally to ActiveMQ only a single thread will be dedicated to this rather 
than 1 per connected node.
+
+Before using this plugin you will need to make appropriate adjustments to your 
ActiveMQ Access Control Lists.
+
+## Configuring
+
+### Common Options
+The most basic configuration method is supported in all versions of the gem:
+
+### Failover Pools
+A sample configuration can be seen below.  Note this plugin does not support 
the old style config of the Stomp connector.
+
+{% highlight ini %}
+connector = activemq
+plugin.activemq.pool.size = 2
+plugin.activemq.pool.1.host = stomp1
+plugin.activemq.pool.1.port = 6163
+plugin.activemq.pool.1.user = me
+plugin.activemq.pool.1.password = secret
+
+plugin.activemq.pool.2.host = stomp2
+plugin.activemq.pool.2.port = 6163
+plugin.activemq.pool.2.user = me
+plugin.activemq.pool.2.password = secret
+{% endhighlight %}
+
+This gives it 2 servers to attempt to connect to, if the first one fails it 
will use the second.  Usernames and passwords can be set
+with the environment variables STOMP_USER, STOMP_PASSWORD.
+
+If you do not specify a port it will default to _61613_
+
+You can also specify the following options for the Stomp gem, these are the 
defaults in the Stomp 1.1.6 gem:
+
+{% highlight ini %}
+plugin.activemq.initial_reconnect_delay = 0.01
+plugin.activemq.max_reconnect_delay = 30.0
+plugin.activemq.use_exponential_back_off = true
+plugin.activemq.back_off_multiplier = 2
+plugin.activemq.max_reconnect_attempts = 0
+plugin.activemq.randomize = false
+plugin.activemq.timeout = -1
+{% endhighlight %}
+
+### Message Priority
+
+ActiveMQ messages support priorities, you can pass in the needed priority 
header by setting:
+
+{% highlight ini %}
+plugin.activemq.priority = 4
+{% endhighlight %}
-- 
1.7.1

-- 
You received this message because you are subscribed to the Google Groups 
"Puppet Developers" group.
To post to this group, send email to puppet-dev@googlegroups.com.
To unsubscribe from this group, send email to 
puppet-dev+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/puppet-dev?hl=en.

Reply via email to