This adds a new direct request method to the Message object
that will communicate with nodes directly.

With this feature enabled requests that goes out to a number
of nodes below a certain threshold (10 by default) will use
the new point to point mode while the rest will use broadcast.

This should be transparent to the end user.  But is disabled
by default

The idea with this is to enable communication with off-line
machines but also to optimize the communication where you need
to send a message to only 10 or so nodes and creating 10 messages
would be more efficient than sending a broadcast message to all of
1000s of nodes,

The other area where this is important is in the web space where
users can easily create arbitrary hosts lists by clicking on nodes.
In this scenario the grouping based on facts, classes etc does not
apply well.  Having a direct mode means you could supply arbitrary
hosts lists.

Ultimately this is the ground work for making discovery pluggable
and optional.  In the case of the web scenario above when the web
application provides a list of hostnames to communicate with the
mcollective infrastructure will not do a discovery but simply go
and communicate with those nodes.

This specific commit begins making the various internals aware of
this new mode, it adds 2 configuration options but leaves this mode
disabled by default.

The stomp connector will create queues per node to enable this,
doing it this way is pretty inefficient and wont work at scale
a new ActiveMQ connector will be created that use a much better
but ActiveMQ specific approach to solving the queues per server
problem.

Followup work needs to be done around request TTLs to avoid down
machines from doing lots of unintended work.

Signed-off-by: R.I.Pienaar <r...@devco.net>
---
Local-branch: feature/master/7988
 lib/mcollective/client.rb                          |   20 ++++++++---
 lib/mcollective/config.rb                          |    6 +++-
 lib/mcollective/message.rb                         |   23 +++++++++++-
 lib/mcollective/rpc/client.rb                      |   10 ++++-
 plugins/mcollective/connector/stomp.rb             |   38 +++++++++++++++-----
 spec/unit/message_spec.rb                          |   37 +++++++++++++++++--
 .../plugins/mcollective/connector/stomp_spec.rb    |   31 ++++++++++++----
 website/changelog.md                               |    1 +
 website/reference/basic/configuration.md           |    5 ++-
 website/reference/plugins/connector_stomp.md       |   11 ++++++
 10 files changed, 151 insertions(+), 31 deletions(-)

diff --git a/lib/mcollective/client.rb b/lib/mcollective/client.rb
index ee4918a..35a375e 100644
--- a/lib/mcollective/client.rb
+++ b/lib/mcollective/client.rb
@@ -38,7 +38,13 @@ module MCollective
         # Sends a request and returns the generated request id, doesn't wait 
for
         # responses and doesn't execute any passed in code blocks for responses
         def sendreq(msg, agent, filter = {})
-            request = Message.new(msg, nil, {:agent => agent, :type => 
:request, :collective => collective, :filter => filter})
+            if msg.is_a?(Message)
+                request = msg
+                agent = request.agent
+            else
+                request = Message.new(msg, nil, {:agent => agent, :type => 
:request, :collective => collective, :filter => filter})
+            end
+
             request.encode!
 
             Log.debug("Sending request #{request.requestid} to the 
#{request.agent} agent in collective #{request.collective}")
@@ -51,9 +57,7 @@ module MCollective
                 @subscriptions[agent] = 1
             end
 
-            Timeout.timeout(2) do
-                request.publish
-            end
+            request.publish
 
             request.requestid
         end
@@ -117,7 +121,13 @@ module MCollective
         #
         # It returns a hash of times and timeouts for discovery and total run 
is taken from the options
         # hash which in turn is generally built using MCollective::Optionparser
-        def req(body, agent, options=false, waitfor=0)
+        def req(body, agent=nil, options=false, waitfor=0)
+            if body.is_a?(Message)
+                agent = body.agent
+                options = body.options
+                waitfor = body.discovered_hosts.size || 0
+            end
+
             stat = {:starttime => Time.now.to_f, :discoverytime => 0, 
:blocktime => 0, :totaltime => 0}
 
             options = @options unless options
diff --git a/lib/mcollective/config.rb b/lib/mcollective/config.rb
index c23aea9..d4b6443 100644
--- a/lib/mcollective/config.rb
+++ b/lib/mcollective/config.rb
@@ -9,7 +9,8 @@ module MCollective
                     :classesfile, :rpcauditprovider, :rpcaudit, :configdir, 
:rpcauthprovider,
                     :rpcauthorization, :color, :configfile, :rpchelptemplate, 
:rpclimitmethod,
                     :logger_type, :fact_cache_time, :collectives, 
:main_collective, :ssl_cipher,
-                    :registration_collective, :direct_addressing, :queueprefix
+                    :registration_collective, :direct_addressing, 
:direct_addressing_threshold,
+                    :queueprefix
 
         def initialize
             @configured = false
@@ -66,6 +67,8 @@ module MCollective
                                     @identity = val
                                 when "direct_addressing"
                                     val =~ /^1|y/i ? @direct_addressing = true 
: @direct_addressing = false
+                                when "direct_addressing_threshold"
+                                    @direct_addressing_threshold = val.to_i
                                 when "color"
                                     val =~ /^1|y/i ? @color = true : @color = 
false
                                 when "daemonize"
@@ -156,6 +159,7 @@ module MCollective
             @main_collective = @collectives.first
             @ssl_cipher = "aes-256-cbc"
             @direct_addressing = false
+            @direct_addressing_threshold = 10
         end
 
         def read_plugin_config_dir(dir)
diff --git a/lib/mcollective/message.rb b/lib/mcollective/message.rb
index 353cdc0..6a89554 100644
--- a/lib/mcollective/message.rb
+++ b/lib/mcollective/message.rb
@@ -2,7 +2,8 @@ module MCollective
     # container for a message, its headers, agent, collective and other meta 
data
     class Message
         attr_reader :payload, :message, :request
-        attr_accessor :headers, :agent, :collective, :type, :filter, :requestid
+        attr_accessor :headers, :agent, :collective, :type, :filter
+        attr_accessor :requestid, :discovered_hosts, :options
 
         # payload              - the message body without headers etc, just 
the text
         # message              - the original message received from the 
middleware
@@ -13,6 +14,7 @@ module MCollective
         # options[:type]       - an indicator about the type of message, 
:message, :request or :reply
         # options[:request]    - if this is a reply this should old the 
message we are replying to
         # options[:filter]     - for requests, the filter to encode into the 
message
+        # options[:options]    - the normal client options hash
         def initialize(payload, message, options = {})
             options = {:base64 => false,
                        :agent => nil,
@@ -20,16 +22,19 @@ module MCollective
                        :type => :message,
                        :request => nil,
                        :filter => Util.empty_filter,
+                       :options => false,
                        :collective => nil}.merge(options)
 
             @payload = payload
             @message = message
             @requestid = nil
+            @discovered_hosts = nil
 
             @type = options[:type]
             @headers = options[:headers]
             @base64 = options[:base64]
             @filter = options[:filter]
+            @options = options[:options]
 
             if options[:request]
                 @request = options[:request]
@@ -97,7 +102,21 @@ module MCollective
 
         # publish a reply message by creating a target name and sending it
         def publish
-            PluginManager["connector_plugin"].publish(self)
+            Timeout.timeout(2) do
+                # If we've been specificaly told about hosts that were 
discovered
+                # use that information to do P2P calls if appropriate else just
+                # send it as is.
+                if @discovered_hosts && Config.instance.direct_addressing
+                    if @discovered_hosts.size <= 
Config.instance.direct_addressing_threshold
+                        @type = :direct_request
+                        Log.debug("Handling #{requestid} as a direct request")
+                    end
+
+                    PluginManager["connector_plugin"].publish(self)
+                else
+                    PluginManager["connector_plugin"].publish(self)
+                end
+            end
         end
 
         def create_reqid
diff --git a/lib/mcollective/rpc/client.rb b/lib/mcollective/rpc/client.rb
index e3e61a6..74712a9 100644
--- a/lib/mcollective/rpc/client.rb
+++ b/lib/mcollective/rpc/client.rb
@@ -423,7 +423,10 @@ module MCollective
                 @ddl.validate_request(action, args) if @ddl
 
                 req = new_request(action.to_s, args)
-                return @client.sendreq(req, @agent, @filter)
+
+                message = Message.new(req, nil, {:agent => @agent, :type => 
:request, :collective => @collective, :filter => filter, :options => options})
+
+                return @client.sendreq(message, nil)
             end
 
             # Handles traditional calls to the remote agents with full stats
@@ -448,11 +451,14 @@ module MCollective
 
                 twirl = Progress.new
 
+                message = Message.new(req, nil, {:agent => @agent, :type => 
:request, :collective => @collective, :filter => filter, :options => opts})
+                message.discovered_hosts = disc.clone
+
                 result = []
                 respcount = 0
 
                 if disc.size > 0
-                    @client.req(req, @agent, opts, disc.size) do |resp|
+                    @client.req(message) do |resp|
                         respcount += 1
 
                         if block_given?
diff --git a/plugins/mcollective/connector/stomp.rb 
b/plugins/mcollective/connector/stomp.rb
index 365eee9..9b1ee87 100644
--- a/plugins/mcollective/connector/stomp.rb
+++ b/plugins/mcollective/connector/stomp.rb
@@ -176,15 +176,20 @@ module MCollective
             def publish(msg)
                 msg.base64_encode! if @base64
 
-                target = make_target(msg.agent, msg.type, msg.collective)
+                if msg.type == :direct_request
+                    msg.discovered_hosts.each do |node|
+                        target = make_target(msg.agent, msg.type, 
msg.collective, node)
 
-                Log.debug("Sending a message to Stomp target '#{target}'")
+                        Log.debug("Sending a direct message to STOMP target 
'#{target}'")
 
-                # deal with deprecation warnings in newer stomp gems
-                if @connection.respond_to?("publish")
-                    @connection.publish(target, msg.payload, msgheaders)
+                        publish_msg(target, msg.payload)
+                    end
                 else
-                    @connection.send(target, msg.payload, msgheaders)
+                    target = make_target(msg.agent, msg.type, msg.collective)
+
+                    Log.debug("Sending a broadcast message to STOMP target 
'#{target}'")
+
+                    publish_msg(target, msg.payload)
                 end
             end
 
@@ -199,6 +204,16 @@ module MCollective
                 end
             end
 
+            # Actually sends the message to the middleware
+            def publish_msg(target, msg)
+                # deal with deprecation warnings in newer stomp gems
+                if @connection.respond_to?("publish")
+                    @connection.publish(target, msg, msgheaders)
+                else
+                    @connection.send(target, msg, msgheaders)
+                end
+            end
+
             # Subscribe to a topic or queue
             def unsubscribe(agent, type, collective)
                 source = make_target(agent, type, collective)
@@ -258,8 +273,8 @@ module MCollective
                 end
             end
 
-            def make_target(agent, type, collective)
-                raise("Unknown target type #{type}") unless [:directed, 
:broadcast, :reply, :request].include?(type)
+            def make_target(agent, type, collective, target_node=nil)
+                raise("Unknown target type #{type}") unless [:directed, 
:broadcast, :reply, :request, :direct_request].include?(type)
                 raise("Unknown collective '#{collective}' known collectives 
are '#{@config.collectives.join ', '}'") unless 
@config.collectives.include?(collective)
 
                 prefix = @config.topicprefix
@@ -271,14 +286,19 @@ module MCollective
                         suffix = :command
                     when :request
                         suffix = :command
+                    when :direct_request
+                        agent = nil
+                        prefix = @config.queueprefix
+                        suffix = Digest::MD5.hexdigest(target_node)
                     when :directed
+                        agent = nil
                         prefix = @config.queueprefix
                         # use a md5 since hostnames might have illegal 
characters that
                         # the middleware dont understand
                         suffix = Digest::MD5.hexdigest(@config.identity)
                 end
 
-                ["#{prefix}#{collective}", agent, 
suffix].join(@config.topicsep)
+                ["#{prefix}#{collective}", agent, 
suffix].compact.join(@config.topicsep)
             end
         end
     end
diff --git a/spec/unit/message_spec.rb b/spec/unit/message_spec.rb
index 0d34831..2ba4756 100644
--- a/spec/unit/message_spec.rb
+++ b/spec/unit/message_spec.rb
@@ -4,9 +4,6 @@ require File.dirname(__FILE__) + '/../spec_helper'
 
 module MCollective
     describe Message do
-        before do
-        end
-
         describe "#initialize" do
             it "should set defaults" do
                 m = Message.new("payload", "message")
@@ -20,6 +17,8 @@ module MCollective
                 m.filter.should == Util.empty_filter
                 m.requestid.should == nil
                 m.base64?.should == false
+                m.options.should == false
+                m.discovered_hosts.should == nil
             end
 
             it "should set all supplied options" do
@@ -30,6 +29,7 @@ module MCollective
                                                       :headers => {:rspec => 
"test"},
                                                       :type => :rspec,
                                                       :filter => "filter",
+                                                      :options => "options",
                                                       :collective => 
"collective")
                 m.payload.should == "payload"
                 m.message.should == "message"
@@ -40,6 +40,7 @@ module MCollective
                 m.type.should == :rspec
                 m.filter.should == "filter"
                 m.base64?.should == true
+                m.options.should == "options"
             end
 
             it "if given a request it should set options based on the request" 
do
@@ -209,6 +210,36 @@ module MCollective
 
                 m.publish
             end
+
+            it "should support direct addressing" do
+                m = Message.new("msg", "message", :type => :request)
+                m.discovered_hosts = ["one", "two", "three"]
+
+                Config.any_instance.expects(:direct_addressing).returns(true)
+                
Config.any_instance.expects(:direct_addressing_threshold).returns(10)
+
+                connector = mock
+                connector.expects(:publish).with(m)
+                PluginManager.expects("[]").returns(connector)
+
+                m.publish
+                m.type.should == :direct_request
+            end
+
+            it "should only direct publish below the configured threshold" do
+                m = Message.new("msg", "message", :type => :request)
+                m.discovered_hosts = ["one", "two", "three"]
+
+                Config.any_instance.expects(:direct_addressing).returns(true)
+                
Config.any_instance.expects(:direct_addressing_threshold).returns(1)
+
+                connector = mock
+                connector.expects(:publish).with(m)
+                PluginManager.expects("[]").returns(connector)
+
+                m.publish
+                m.type.should == :request
+            end
         end
 
         describe "#create_reqid" do
diff --git a/spec/unit/plugins/mcollective/connector/stomp_spec.rb 
b/spec/unit/plugins/mcollective/connector/stomp_spec.rb
index a1c0348..88bcfd5 100644
--- a/spec/unit/plugins/mcollective/connector/stomp_spec.rb
+++ b/spec/unit/plugins/mcollective/connector/stomp_spec.rb
@@ -176,39 +176,54 @@ module MCollective
                     @c.publish(@msg)
                 end
 
+                it "should publish direct requests for each discovered host" do
+                    @msg.expects(:type).returns(:direct_request).times(3)
+                    @msg.expects(:discovered_hosts).returns(["one", "two"])
+
+                    @c.expects(:make_target).with("agent", :direct_request, 
"mcollective", "one").returns("target_one")
+                    @c.expects(:make_target).with("agent", :direct_request, 
"mcollective", "two").returns("target_two")
+
+                    @c.expects(:publish_msg).with("target_one", "msg")
+                    @c.expects(:publish_msg).with("target_two", "msg")
+
+                    @c.publish(@msg)
+                end
+            end
+
+            describe "#publish_msg" do
                 it "should use the publish method if it exists" do
+                    
@connection.expects("respond_to?").with("publish").returns(true)
                     @connection.expects(:publish).with("test", "msg", {}).once
                     @c.stubs(:msgheaders).returns({})
-                    @c.expects(:make_target).returns("test")
 
-                    @c.publish(@msg)
+                    @c.publish_msg("test", "msg")
                 end
 
                 it "should use the send method if publish does not exist" do
                     
@connection.expects("respond_to?").with('publish').returns(false)
                     @connection.expects(:send).with("test", "msg", {}).once
                     @c.stubs(:msgheaders).returns({})
-                    @c.expects(:make_target).returns("test")
 
-                    @c.publish(@msg)
+                    @c.publish_msg("test", "msg")
                 end
 
                 it "should publish the correct message to the correct target 
with msgheaders" do
+                    
@connection.expects("respond_to?").with("publish").returns(true)
                     @connection.expects(:publish).with("test", "msg", {"test" 
=> "test"}).once
                     @c.expects(:msgheaders).returns({"test" => "test"})
-                    @c.expects(:make_target).returns("test")
 
-                    @c.publish(@msg)
+                    @c.publish_msg("test", "msg")
                 end
 
             end
 
             describe "#make_target" do
                 it "should create correct targets" do
-                    @config.expects(:queueprefix).returns("/queue/")
+                    @config.expects(:queueprefix).returns("/queue/").twice
 
                     @c.make_target("test", :broadcast, "mcollective").should 
== "/topic/mcollective.test.command"
-                    @c.make_target("test", :directed, "mcollective").should == 
"/queue/mcollective.test.2bc84dc69b73db9383b9c6711d2011b7"
+                    @c.make_target("test", :directed, "mcollective").should == 
"/queue/mcollective.2bc84dc69b73db9383b9c6711d2011b7"
+                    @c.make_target("test", :direct_request, "mcollective", 
"rspec").should == "/queue/mcollective.2bc84dc69b73db9383b9c6711d2011b7"
                     @c.make_target("test", :reply, "mcollective").should == 
"/topic/mcollective.test.reply"
                     @c.make_target("test", :request, "mcollective").should == 
"/topic/mcollective.test.command"
                 end
diff --git a/website/changelog.md b/website/changelog.md
index 2b4b876..82e14bc 100644
--- a/website/changelog.md
+++ b/website/changelog.md
@@ -11,6 +11,7 @@ title: Changelog
 
 |Date|Description|Ticket|
 |----|-----------|------|
+|2011/06/30|Add the ability to do point to point comms for requests affecting 
small numbers of hosts|7988|
 |2011/06/21|Add support for Stomp Gem version 1.1.9 callback based 
logging|7960|
 |2011/06/21|On the server side log missing DDL files at debug and not warning 
level|7961|
 |2011/06/16|Add the ability for nodes to subscribe to per-node queues, off by 
default|7225|
diff --git a/website/reference/basic/configuration.md 
b/website/reference/basic/configuration.md
index 9fb5f3b..56a58e4 100644
--- a/website/reference/basic/configuration.md
+++ b/website/reference/basic/configuration.md
@@ -28,7 +28,8 @@ Configuration is a simple *key = val* style configuration 
file.
 
 |Key|Sample|Description|
 |---|------|-----------|
-|topicprefix|/topic/mcollective|Prefix that gets used for all messages.  Post 
1.1.3 this should just be /topic/|
+|topicprefix|/topic/|Prefix that gets used for all messages.|
+|queueprefix|/queue/|Prefix that gets used for all queued messages.|
 |topicnamesep|.|The seperator to use between parts of the topic path|
 |collectives|mcollective,subcollective|A list of [Subcollectives] to join - 
1.1.3 and newer only|
 |main_collective|mcollective|The main collective to target - 1.1.3 and newer 
only|
@@ -43,6 +44,8 @@ Configuration is a simple *key = val* style configuration 
file.
 |rpchelptemplate|/etc/mcollective/rpc-help.erb|The path to the erb template 
used for generating help|
 |logger_type|file|Valid logger types, currently file, syslog or console|
 |ssl_cipher|aes-256-cbc|This sets the cipher in use by the SSL code, see _man 
enc_ for a list supported by OpenSSL|
+|direct_addressing|n|Enable or disable directed requests|
+|direct_addressing_threshold|10|When direct requests are enabled, send direct 
messages for less than or equal to this many hosts|
 
 ## Server Configuration
 The server configuration file should be root only readable
diff --git a/website/reference/plugins/connector_stomp.md 
b/website/reference/plugins/connector_stomp.md
index 32cfc9b..fe579b8 100644
--- a/website/reference/plugins/connector_stomp.md
+++ b/website/reference/plugins/connector_stomp.md
@@ -11,6 +11,17 @@ The stomp connector uses the [STOMP] rubygem to connect to 
compatible servers.
 
 This code will only work with version _1.1_ and _1.1.6_ or newer of the Stomp 
gem, the in between versions have threading issues.
 
+As this connector tries to be as generic as possible it is hard to support all 
the advanced features of MCollective using it.  We do not recommend you use the 
directed mode
+using this plugin, instead look towards specific ones written for ActiveMQ or 
your chosen middleware.
+
+## Middleware Layout
+
+For broadcast messages this connector will create _topics_ with names like 
_/topic/&lt;collective&gt;.&lt;agent&gt;.command_ and replies will go to
+_/topic/&lt;collective&gt;.&lt;agent&gt;.reply_
+
+For directed messages it will create queues with names like 
_/queue/&lt;collective&gt;.mcollective.&lt;md5 hash of identity&gt;_.
+
+You should configure appropriate ACLs on your middleware to allow this scheme
 
 ## Configuring
 
-- 
1.7.1

-- 
You received this message because you are subscribed to the Google Groups 
"Puppet Developers" group.
To post to this group, send email to puppet-dev@googlegroups.com.
To unsubscribe from this group, send email to 
puppet-dev+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/puppet-dev?hl=en.

Reply via email to