This adds a new direct request method to the Message object that will communicate with nodes directly.
With this feature enabled requests that goes out to a number of nodes below a certain threshold (10 by default) will use the new point to point mode while the rest will use broadcast. This should be transparent to the end user. But is disabled by default The idea with this is to enable communication with off-line machines but also to optimize the communication where you need to send a message to only 10 or so nodes and creating 10 messages would be more efficient than sending a broadcast message to all of 1000s of nodes, The other area where this is important is in the web space where users can easily create arbitrary hosts lists by clicking on nodes. In this scenario the grouping based on facts, classes etc does not apply well. Having a direct mode means you could supply arbitrary hosts lists. Ultimately this is the ground work for making discovery pluggable and optional. In the case of the web scenario above when the web application provides a list of hostnames to communicate with the mcollective infrastructure will not do a discovery but simply go and communicate with those nodes. This specific commit begins making the various internals aware of this new mode, it adds 2 configuration options but leaves this mode disabled by default. The stomp connector will create queues per node to enable this, doing it this way is pretty inefficient and wont work at scale a new ActiveMQ connector will be created that use a much better but ActiveMQ specific approach to solving the queues per server problem. Followup work needs to be done around request TTLs to avoid down machines from doing lots of unintended work. Signed-off-by: R.I.Pienaar <r...@devco.net> --- Local-branch: feature/master/7988 lib/mcollective/client.rb | 20 ++++++++--- lib/mcollective/config.rb | 6 +++- lib/mcollective/message.rb | 23 +++++++++++- lib/mcollective/rpc/client.rb | 10 ++++- plugins/mcollective/connector/stomp.rb | 38 +++++++++++++++----- spec/unit/message_spec.rb | 37 +++++++++++++++++-- .../plugins/mcollective/connector/stomp_spec.rb | 31 ++++++++++++---- website/changelog.md | 1 + website/reference/basic/configuration.md | 5 ++- website/reference/plugins/connector_stomp.md | 11 ++++++ 10 files changed, 151 insertions(+), 31 deletions(-) diff --git a/lib/mcollective/client.rb b/lib/mcollective/client.rb index ee4918a..35a375e 100644 --- a/lib/mcollective/client.rb +++ b/lib/mcollective/client.rb @@ -38,7 +38,13 @@ module MCollective # Sends a request and returns the generated request id, doesn't wait for # responses and doesn't execute any passed in code blocks for responses def sendreq(msg, agent, filter = {}) - request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter}) + if msg.is_a?(Message) + request = msg + agent = request.agent + else + request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter}) + end + request.encode! Log.debug("Sending request #{request.requestid} to the #{request.agent} agent in collective #{request.collective}") @@ -51,9 +57,7 @@ module MCollective @subscriptions[agent] = 1 end - Timeout.timeout(2) do - request.publish - end + request.publish request.requestid end @@ -117,7 +121,13 @@ module MCollective # # It returns a hash of times and timeouts for discovery and total run is taken from the options # hash which in turn is generally built using MCollective::Optionparser - def req(body, agent, options=false, waitfor=0) + def req(body, agent=nil, options=false, waitfor=0) + if body.is_a?(Message) + agent = body.agent + options = body.options + waitfor = body.discovered_hosts.size || 0 + end + stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} options = @options unless options diff --git a/lib/mcollective/config.rb b/lib/mcollective/config.rb index c23aea9..d4b6443 100644 --- a/lib/mcollective/config.rb +++ b/lib/mcollective/config.rb @@ -9,7 +9,8 @@ module MCollective :classesfile, :rpcauditprovider, :rpcaudit, :configdir, :rpcauthprovider, :rpcauthorization, :color, :configfile, :rpchelptemplate, :rpclimitmethod, :logger_type, :fact_cache_time, :collectives, :main_collective, :ssl_cipher, - :registration_collective, :direct_addressing, :queueprefix + :registration_collective, :direct_addressing, :direct_addressing_threshold, + :queueprefix def initialize @configured = false @@ -66,6 +67,8 @@ module MCollective @identity = val when "direct_addressing" val =~ /^1|y/i ? @direct_addressing = true : @direct_addressing = false + when "direct_addressing_threshold" + @direct_addressing_threshold = val.to_i when "color" val =~ /^1|y/i ? @color = true : @color = false when "daemonize" @@ -156,6 +159,7 @@ module MCollective @main_collective = @collectives.first @ssl_cipher = "aes-256-cbc" @direct_addressing = false + @direct_addressing_threshold = 10 end def read_plugin_config_dir(dir) diff --git a/lib/mcollective/message.rb b/lib/mcollective/message.rb index 353cdc0..6a89554 100644 --- a/lib/mcollective/message.rb +++ b/lib/mcollective/message.rb @@ -2,7 +2,8 @@ module MCollective # container for a message, its headers, agent, collective and other meta data class Message attr_reader :payload, :message, :request - attr_accessor :headers, :agent, :collective, :type, :filter, :requestid + attr_accessor :headers, :agent, :collective, :type, :filter + attr_accessor :requestid, :discovered_hosts, :options # payload - the message body without headers etc, just the text # message - the original message received from the middleware @@ -13,6 +14,7 @@ module MCollective # options[:type] - an indicator about the type of message, :message, :request or :reply # options[:request] - if this is a reply this should old the message we are replying to # options[:filter] - for requests, the filter to encode into the message + # options[:options] - the normal client options hash def initialize(payload, message, options = {}) options = {:base64 => false, :agent => nil, @@ -20,16 +22,19 @@ module MCollective :type => :message, :request => nil, :filter => Util.empty_filter, + :options => false, :collective => nil}.merge(options) @payload = payload @message = message @requestid = nil + @discovered_hosts = nil @type = options[:type] @headers = options[:headers] @base64 = options[:base64] @filter = options[:filter] + @options = options[:options] if options[:request] @request = options[:request] @@ -97,7 +102,21 @@ module MCollective # publish a reply message by creating a target name and sending it def publish - PluginManager["connector_plugin"].publish(self) + Timeout.timeout(2) do + # If we've been specificaly told about hosts that were discovered + # use that information to do P2P calls if appropriate else just + # send it as is. + if @discovered_hosts && Config.instance.direct_addressing + if @discovered_hosts.size <= Config.instance.direct_addressing_threshold + @type = :direct_request + Log.debug("Handling #{requestid} as a direct request") + end + + PluginManager["connector_plugin"].publish(self) + else + PluginManager["connector_plugin"].publish(self) + end + end end def create_reqid diff --git a/lib/mcollective/rpc/client.rb b/lib/mcollective/rpc/client.rb index e3e61a6..74712a9 100644 --- a/lib/mcollective/rpc/client.rb +++ b/lib/mcollective/rpc/client.rb @@ -423,7 +423,10 @@ module MCollective @ddl.validate_request(action, args) if @ddl req = new_request(action.to_s, args) - return @client.sendreq(req, @agent, @filter) + + message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options}) + + return @client.sendreq(message, nil) end # Handles traditional calls to the remote agents with full stats @@ -448,11 +451,14 @@ module MCollective twirl = Progress.new + message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => opts}) + message.discovered_hosts = disc.clone + result = [] respcount = 0 if disc.size > 0 - @client.req(req, @agent, opts, disc.size) do |resp| + @client.req(message) do |resp| respcount += 1 if block_given? diff --git a/plugins/mcollective/connector/stomp.rb b/plugins/mcollective/connector/stomp.rb index 365eee9..9b1ee87 100644 --- a/plugins/mcollective/connector/stomp.rb +++ b/plugins/mcollective/connector/stomp.rb @@ -176,15 +176,20 @@ module MCollective def publish(msg) msg.base64_encode! if @base64 - target = make_target(msg.agent, msg.type, msg.collective) + if msg.type == :direct_request + msg.discovered_hosts.each do |node| + target = make_target(msg.agent, msg.type, msg.collective, node) - Log.debug("Sending a message to Stomp target '#{target}'") + Log.debug("Sending a direct message to STOMP target '#{target}'") - # deal with deprecation warnings in newer stomp gems - if @connection.respond_to?("publish") - @connection.publish(target, msg.payload, msgheaders) + publish_msg(target, msg.payload) + end else - @connection.send(target, msg.payload, msgheaders) + target = make_target(msg.agent, msg.type, msg.collective) + + Log.debug("Sending a broadcast message to STOMP target '#{target}'") + + publish_msg(target, msg.payload) end end @@ -199,6 +204,16 @@ module MCollective end end + # Actually sends the message to the middleware + def publish_msg(target, msg) + # deal with deprecation warnings in newer stomp gems + if @connection.respond_to?("publish") + @connection.publish(target, msg, msgheaders) + else + @connection.send(target, msg, msgheaders) + end + end + # Subscribe to a topic or queue def unsubscribe(agent, type, collective) source = make_target(agent, type, collective) @@ -258,8 +273,8 @@ module MCollective end end - def make_target(agent, type, collective) - raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request].include?(type) + def make_target(agent, type, collective, target_node=nil) + raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type) raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective) prefix = @config.topicprefix @@ -271,14 +286,19 @@ module MCollective suffix = :command when :request suffix = :command + when :direct_request + agent = nil + prefix = @config.queueprefix + suffix = Digest::MD5.hexdigest(target_node) when :directed + agent = nil prefix = @config.queueprefix # use a md5 since hostnames might have illegal characters that # the middleware dont understand suffix = Digest::MD5.hexdigest(@config.identity) end - ["#{prefix}#{collective}", agent, suffix].join(@config.topicsep) + ["#{prefix}#{collective}", agent, suffix].compact.join(@config.topicsep) end end end diff --git a/spec/unit/message_spec.rb b/spec/unit/message_spec.rb index 0d34831..2ba4756 100644 --- a/spec/unit/message_spec.rb +++ b/spec/unit/message_spec.rb @@ -4,9 +4,6 @@ require File.dirname(__FILE__) + '/../spec_helper' module MCollective describe Message do - before do - end - describe "#initialize" do it "should set defaults" do m = Message.new("payload", "message") @@ -20,6 +17,8 @@ module MCollective m.filter.should == Util.empty_filter m.requestid.should == nil m.base64?.should == false + m.options.should == false + m.discovered_hosts.should == nil end it "should set all supplied options" do @@ -30,6 +29,7 @@ module MCollective :headers => {:rspec => "test"}, :type => :rspec, :filter => "filter", + :options => "options", :collective => "collective") m.payload.should == "payload" m.message.should == "message" @@ -40,6 +40,7 @@ module MCollective m.type.should == :rspec m.filter.should == "filter" m.base64?.should == true + m.options.should == "options" end it "if given a request it should set options based on the request" do @@ -209,6 +210,36 @@ module MCollective m.publish end + + it "should support direct addressing" do + m = Message.new("msg", "message", :type => :request) + m.discovered_hosts = ["one", "two", "three"] + + Config.any_instance.expects(:direct_addressing).returns(true) + Config.any_instance.expects(:direct_addressing_threshold).returns(10) + + connector = mock + connector.expects(:publish).with(m) + PluginManager.expects("[]").returns(connector) + + m.publish + m.type.should == :direct_request + end + + it "should only direct publish below the configured threshold" do + m = Message.new("msg", "message", :type => :request) + m.discovered_hosts = ["one", "two", "three"] + + Config.any_instance.expects(:direct_addressing).returns(true) + Config.any_instance.expects(:direct_addressing_threshold).returns(1) + + connector = mock + connector.expects(:publish).with(m) + PluginManager.expects("[]").returns(connector) + + m.publish + m.type.should == :request + end end describe "#create_reqid" do diff --git a/spec/unit/plugins/mcollective/connector/stomp_spec.rb b/spec/unit/plugins/mcollective/connector/stomp_spec.rb index a1c0348..88bcfd5 100644 --- a/spec/unit/plugins/mcollective/connector/stomp_spec.rb +++ b/spec/unit/plugins/mcollective/connector/stomp_spec.rb @@ -176,39 +176,54 @@ module MCollective @c.publish(@msg) end + it "should publish direct requests for each discovered host" do + @msg.expects(:type).returns(:direct_request).times(3) + @msg.expects(:discovered_hosts).returns(["one", "two"]) + + @c.expects(:make_target).with("agent", :direct_request, "mcollective", "one").returns("target_one") + @c.expects(:make_target).with("agent", :direct_request, "mcollective", "two").returns("target_two") + + @c.expects(:publish_msg).with("target_one", "msg") + @c.expects(:publish_msg).with("target_two", "msg") + + @c.publish(@msg) + end + end + + describe "#publish_msg" do it "should use the publish method if it exists" do + @connection.expects("respond_to?").with("publish").returns(true) @connection.expects(:publish).with("test", "msg", {}).once @c.stubs(:msgheaders).returns({}) - @c.expects(:make_target).returns("test") - @c.publish(@msg) + @c.publish_msg("test", "msg") end it "should use the send method if publish does not exist" do @connection.expects("respond_to?").with('publish').returns(false) @connection.expects(:send).with("test", "msg", {}).once @c.stubs(:msgheaders).returns({}) - @c.expects(:make_target).returns("test") - @c.publish(@msg) + @c.publish_msg("test", "msg") end it "should publish the correct message to the correct target with msgheaders" do + @connection.expects("respond_to?").with("publish").returns(true) @connection.expects(:publish).with("test", "msg", {"test" => "test"}).once @c.expects(:msgheaders).returns({"test" => "test"}) - @c.expects(:make_target).returns("test") - @c.publish(@msg) + @c.publish_msg("test", "msg") end end describe "#make_target" do it "should create correct targets" do - @config.expects(:queueprefix).returns("/queue/") + @config.expects(:queueprefix).returns("/queue/").twice @c.make_target("test", :broadcast, "mcollective").should == "/topic/mcollective.test.command" - @c.make_target("test", :directed, "mcollective").should == "/queue/mcollective.test.2bc84dc69b73db9383b9c6711d2011b7" + @c.make_target("test", :directed, "mcollective").should == "/queue/mcollective.2bc84dc69b73db9383b9c6711d2011b7" + @c.make_target("test", :direct_request, "mcollective", "rspec").should == "/queue/mcollective.2bc84dc69b73db9383b9c6711d2011b7" @c.make_target("test", :reply, "mcollective").should == "/topic/mcollective.test.reply" @c.make_target("test", :request, "mcollective").should == "/topic/mcollective.test.command" end diff --git a/website/changelog.md b/website/changelog.md index 2b4b876..82e14bc 100644 --- a/website/changelog.md +++ b/website/changelog.md @@ -11,6 +11,7 @@ title: Changelog |Date|Description|Ticket| |----|-----------|------| +|2011/06/30|Add the ability to do point to point comms for requests affecting small numbers of hosts|7988| |2011/06/21|Add support for Stomp Gem version 1.1.9 callback based logging|7960| |2011/06/21|On the server side log missing DDL files at debug and not warning level|7961| |2011/06/16|Add the ability for nodes to subscribe to per-node queues, off by default|7225| diff --git a/website/reference/basic/configuration.md b/website/reference/basic/configuration.md index 9fb5f3b..56a58e4 100644 --- a/website/reference/basic/configuration.md +++ b/website/reference/basic/configuration.md @@ -28,7 +28,8 @@ Configuration is a simple *key = val* style configuration file. |Key|Sample|Description| |---|------|-----------| -|topicprefix|/topic/mcollective|Prefix that gets used for all messages. Post 1.1.3 this should just be /topic/| +|topicprefix|/topic/|Prefix that gets used for all messages.| +|queueprefix|/queue/|Prefix that gets used for all queued messages.| |topicnamesep|.|The seperator to use between parts of the topic path| |collectives|mcollective,subcollective|A list of [Subcollectives] to join - 1.1.3 and newer only| |main_collective|mcollective|The main collective to target - 1.1.3 and newer only| @@ -43,6 +44,8 @@ Configuration is a simple *key = val* style configuration file. |rpchelptemplate|/etc/mcollective/rpc-help.erb|The path to the erb template used for generating help| |logger_type|file|Valid logger types, currently file, syslog or console| |ssl_cipher|aes-256-cbc|This sets the cipher in use by the SSL code, see _man enc_ for a list supported by OpenSSL| +|direct_addressing|n|Enable or disable directed requests| +|direct_addressing_threshold|10|When direct requests are enabled, send direct messages for less than or equal to this many hosts| ## Server Configuration The server configuration file should be root only readable diff --git a/website/reference/plugins/connector_stomp.md b/website/reference/plugins/connector_stomp.md index 32cfc9b..fe579b8 100644 --- a/website/reference/plugins/connector_stomp.md +++ b/website/reference/plugins/connector_stomp.md @@ -11,6 +11,17 @@ The stomp connector uses the [STOMP] rubygem to connect to compatible servers. This code will only work with version _1.1_ and _1.1.6_ or newer of the Stomp gem, the in between versions have threading issues. +As this connector tries to be as generic as possible it is hard to support all the advanced features of MCollective using it. We do not recommend you use the directed mode +using this plugin, instead look towards specific ones written for ActiveMQ or your chosen middleware. + +## Middleware Layout + +For broadcast messages this connector will create _topics_ with names like _/topic/<collective>.<agent>.command_ and replies will go to +_/topic/<collective>.<agent>.reply_ + +For directed messages it will create queues with names like _/queue/<collective>.mcollective.<md5 hash of identity>_. + +You should configure appropriate ACLs on your middleware to allow this scheme ## Configuring -- 1.7.1 -- You received this message because you are subscribed to the Google Groups "Puppet Developers" group. To post to this group, send email to puppet-dev@googlegroups.com. To unsubscribe from this group, send email to puppet-dev+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/puppet-dev?hl=en.