This patch gives Puppet the ability to find a puppet master via SRV
  records in DNS.  First Puppet will try to resolve the server parameter
  in puppet.conf (or supplied on command line) to an SRV record before
  treating it as a regular host.  This patch basically adds client-side
  load-balancing.

  * Adds 2 new configuration variables:

    use_srv_records: Will attempt to lookup SRV records for hostname
                     found in srv_record (default: true)
    srv_record: The hostname that will be queried for SRV records,
                (default: _puppet._tcp.$domain)

  * Changes 'server' config variable to set use_srv_records to false
    if the server is manually configured.

Signed-off-by: Andrew J. Forgue <[email protected]>
---
 lib/puppet/defaults.rb                             |   11 +++-
 lib/puppet/indirector/rest.rb                      |   64 ++++++++++++++++---
 lib/puppet/network/resolver.rb                     |   59 ++++++++++++++++++
 lib/puppet/type/file/content.rb                    |   28 ++++++++-
 lib/puppet/type/file/source.rb                     |    4 +
 .../indirector/bucket_file/rest_spec.rb            |    1 +
 .../indirector/certificate_request/rest_spec.rb    |    1 +
 spec/unit/network/resolver_spec.rb                 |   64 ++++++++++++++++++++
 spec/unit/type/file/content_spec.rb                |    8 ++-
 9 files changed, 224 insertions(+), 16 deletions(-)
 create mode 100644 lib/puppet/network/resolver.rb
 create mode 100755 spec/unit/network/resolver_spec.rb

diff --git a/lib/puppet/defaults.rb b/lib/puppet/defaults.rb
index 4521a59..05e2d9f 100644
--- a/lib/puppet/defaults.rb
+++ b/lib/puppet/defaults.rb
@@ -497,7 +497,16 @@ module Puppet
       :mode => 0640,
       :desc => "The log file for puppet agent.  This is generally not used."
     },
-    :server => ["puppet", "The server to which server puppet agent should 
connect"],
+    :server => { 
+      :default => "puppet",
+      :desc => "The server to which server puppet agent should connect",
+      :call_on_define => false, 
+      :hook => proc do
+        Puppet.settings[:use_srv_records] = false
+      end
+    },
+    :use_srv_records => [true, "Whether the server will search for SRV records 
in DNS for the current domain"],
+    :srv_record => [ "_puppet._tcp.#{domain}", "The default SRV record which 
will be queried to find a server"],
     :ignoreschedules => [false,
       "Boolean; whether puppet agent should ignore schedules.  This is useful
       for initial puppet agent runs."],
diff --git a/lib/puppet/indirector/rest.rb b/lib/puppet/indirector/rest.rb
index eb41ff3..a99d367 100644
--- a/lib/puppet/indirector/rest.rb
+++ b/lib/puppet/indirector/rest.rb
@@ -4,6 +4,7 @@ require 'uri'
 require 'puppet/network/http_pool'
 require 'puppet/network/http/api/v1'
 require 'puppet/network/http/compression'
+require 'puppet/network/resolver'
 
 # Access objects via REST
 class Puppet::Indirector::REST < Puppet::Indirector::Terminus
@@ -19,19 +20,46 @@ class Puppet::Indirector::REST < 
Puppet::Indirector::Terminus
     @server_setting = setting
   end
 
-  def self.server
-    Puppet.settings[server_setting || :server]
-  end
-
   # Specify the setting that we should use to get the port.
   def self.use_port_setting(setting)
     @port_setting = setting
   end
 
+  def self.server
+    Puppet.settings[server_setting || :server]
+  end
+
   def self.port
     Puppet.settings[port_setting || :masterport].to_i
   end
 
+  def resolve(request)
+    if !Puppet.settings[:use_srv_records] or request.server or 
self.class.server_setting or self.class.port_setting
+      request.server ||= self.class.server
+      request.port ||= self.class.port
+      yield request
+      return
+    end
+
+    # Finally, find a working SRV record, if none ...
+    Puppet::Network::Resolver.by_srv(Puppet.settings[:srv_record]) do 
|srv_server, srv_port|
+      begin
+        request.server = srv_server
+        request.port   = srv_port
+        yield request
+        return
+      rescue SystemCallError => e
+        Puppet.warning "Error connecting to #{srv_server}:#{srv_port}: 
#{e.message}"
+      end
+    end
+
+    # ... Fall back onto the default server.
+    Puppet.debug "No more servers left, falling back to #{self.class.server}"
+    request.server = self.class.server
+    request.port = self.class.port
+    yield request
+  end
+
   # Figure out the content type, turn that into a format, and use the format
   # to extract the body of the response.
   def deserialize(response, multiple = false)
@@ -68,26 +96,42 @@ class Puppet::Indirector::REST < 
Puppet::Indirector::Terminus
   end
 
   def find(request)
-    return nil unless result = 
deserialize(network(request).get(indirection2uri(request), headers))
+    result = nil
+
+    resolve(request) do |request|
+      result = deserialize(network(request).get(indirection2uri(request), 
headers)) 
+    end
+      
+    return nil unless result
     result.name = request.key if result.respond_to?(:name=)
+
     result
   end
 
   def search(request)
-    unless result = deserialize(network(request).get(indirection2uri(request), 
headers), true)
-      return []
+    result = nil
+
+    resolve(request) do |request| 
+      result = deserialize(network(request).get(indirection2uri(request), 
headers), true)
     end
-    result
+
+    result || []
   end
 
   def destroy(request)
     raise ArgumentError, "DELETE does not accept options" unless 
request.options.empty?
-    deserialize network(request).delete(indirection2uri(request), headers)
+
+    resolve(request) do |request| 
+      return deserialize network(request).delete(indirection2uri(request), 
headers)
+    end
   end
 
   def save(request)
     raise ArgumentError, "PUT does not accept options" unless 
request.options.empty?
-    deserialize network(request).put(indirection2uri(request), 
request.instance.render, headers.merge({ "Content-Type" => 
request.instance.mime }))
+
+    resolve(request) do |request|
+      return deserialize network(request).put(indirection2uri(request), 
request.instance.render, headers.merge({ "Content-Type" => 
request.instance.mime }))
+    end
   end
 
   private
diff --git a/lib/puppet/network/resolver.rb b/lib/puppet/network/resolver.rb
new file mode 100644
index 0000000..168f295
--- /dev/null
+++ b/lib/puppet/network/resolver.rb
@@ -0,0 +1,59 @@
+require 'resolv'
+module Puppet::Network; end
+
+module Puppet::Network::Resolver
+  # Iterate through the list of servers that service this hostname
+  # and yield each server/port since SRV records have ports in them
+  # It will override whatever masterport setting is already set.
+  def self.by_srv(srv)
+    Puppet.debug "Searching for SRV records for #{srv}"
+
+    resolver = Resolv::DNS.new
+    records = resolver.getresources(srv, Resolv::DNS::Resource::IN::SRV)
+
+    Puppet.debug "Found #{records.size} SRV records."
+
+    each_priority(records) do |priority, records|
+      while next_rr = records.delete(find_weighted_server(records))
+        Puppet.debug "Yielding next server of 
#{next_rr.target.to_s}:#{next_rr.port}"
+        yield next_rr.target.to_s, next_rr.port
+      end
+    end
+  end
+
+  private
+
+  def self.each_priority(records)
+    pri_hash = records.inject({}) do |groups, element|
+      groups[element.priority] ||= []
+      groups[element.priority] << element
+      groups
+    end
+
+    pri_hash.keys.sort.each { |key| yield key, pri_hash[key] }
+  end
+    
+  def self.find_weighted_server(records)
+    return nil if records.nil? || records.empty?
+    return records.first if records.size == 1
+  
+    # Calculate the sum of all weights in the list of resource records,
+    # This is used to then select hosts until the weight exceeds what 
+    # random number we selected.  For example, if we have weights of 1 8 and 3:
+    #
+    # |-|---|--------|
+    #        ^ 
+    # We generate a random number 5, and iterate through the records, adding
+    # the current record's weight to the accumulator until the weight of the
+    # current record plus previous records is greater than the random number.
+    
+    total_weight = records.inject(0) { |sum,record| sum + record.weight }
+    current_weight = 0
+    chosen_weight = 1 + rand(total_weight)
+   
+    records.each do |record|
+      current_weight += record.weight
+      return record if current_weight >= chosen_weight
+    end
+  end
+end
diff --git a/lib/puppet/type/file/content.rb b/lib/puppet/type/file/content.rb
index b8f30a9..834d440 100755
--- a/lib/puppet/type/file/content.rb
+++ b/lib/puppet/type/file/content.rb
@@ -189,10 +189,34 @@ module Puppet
       end
     end
 
-    def chunk_file_from_source(source_or_content)
+
+    def get_from_source(source_or_content)
       request = Puppet::Indirector::Request.new(:file_content, :find, 
source_or_content.full_path.sub(/^\//,''))
+      if source_or_content.server? or !Puppet.settings[:use_srv_records]
+        connection = 
Puppet::Network::HttpPool.http_instance(source_or_content.server, 
source_or_content.port)
+        yield connection.get(indirection2uri(request), 
add_accept_encoding({"Accept" => "raw"}))
+        return
+      else
+        Puppet::Network::Resolver.by_srv(Puppet.settings[:srv_record]) do 
|server, port|
+          begin
+            connection = Puppet::Network::HttpPool.http_instance(server, port)
+            yield connection.get(indirection2uri(request), 
add_accept_encoding({"Accept" => "raw"}))
+            return
+          rescue SystemCallError => e
+            Puppet.warning "Error connecting to #{server}:#{port}: 
#{e.message}"
+          end
+        end
+      end
+     
+      # Fallback to hostname configured in :server
+      Puppet.debug "No more servers left, falling back to 
#{source_or_content.server}"
       connection = 
Puppet::Network::HttpPool.http_instance(source_or_content.server, 
source_or_content.port)
-      connection.request_get(indirection2uri(request), 
add_accept_encoding({"Accept" => "raw"})) do |response|
+      yield connection.get(indirection2uri(request), 
add_accept_encoding({"Accept" => "raw"}))
+    end
+
+
+    def chunk_file_from_source(source_or_content)
+      get_from_source(source_or_content) do |response|
         case response.code
         when "404"; nil
         when /^2/;  uncompress(response) { |uncompressor| response.read_body { 
|chunk| yield uncompressor.uncompress(chunk) } }
diff --git a/lib/puppet/type/file/source.rb b/lib/puppet/type/file/source.rb
index 7d03de2..f8c411e 100755
--- a/lib/puppet/type/file/source.rb
+++ b/lib/puppet/type/file/source.rb
@@ -180,6 +180,10 @@ module Puppet
       URI.unescape(uri.path) if found? and uri
     end
 
+    def server?
+       uri and uri.host
+    end
+
     def server
       (uri and uri.host) or Puppet.settings[:server]
     end
diff --git a/spec/integration/indirector/bucket_file/rest_spec.rb 
b/spec/integration/indirector/bucket_file/rest_spec.rb
index acfc059..0cc98be 100644
--- a/spec/integration/indirector/bucket_file/rest_spec.rb
+++ b/spec/integration/indirector/bucket_file/rest_spec.rb
@@ -20,6 +20,7 @@ describe "Filebucket REST Terminus" do
     Puppet.settings[:group] = Process.gid
     Puppet.settings[:server] = "127.0.0.1"
     Puppet.settings[:masterport] = "34343"
+    Puppet.settings[:use_srv_records] = false
 
     Puppet::Util::Cacher.expire
 
diff --git a/spec/integration/indirector/certificate_request/rest_spec.rb 
b/spec/integration/indirector/certificate_request/rest_spec.rb
index c718b78..68e76d3 100755
--- a/spec/integration/indirector/certificate_request/rest_spec.rb
+++ b/spec/integration/indirector/certificate_request/rest_spec.rb
@@ -22,6 +22,7 @@ describe "Certificate Request REST Terminus" do
     Puppet.settings[:group] = Process.gid
     Puppet.settings[:server] = "127.0.0.1"
     Puppet.settings[:masterport] = "34343"
+    Puppet.settings[:use_srv_records] = false
 
     Puppet[:servertype] = 'webrick'
     Puppet[:server] = '127.0.0.1'
diff --git a/spec/unit/network/resolver_spec.rb 
b/spec/unit/network/resolver_spec.rb
new file mode 100755
index 0000000..b2fb3af
--- /dev/null
+++ b/spec/unit/network/resolver_spec.rb
@@ -0,0 +1,64 @@
+#!/usr/bin/env ruby
+require File.dirname(__FILE__) + '/../../spec_helper'
+require 'puppet/network/resolver'
+
+describe Puppet::Network::Resolver do
+  before do
+    @dns_mock_object = mock('dns')
+    Resolv::DNS.expects(:new).returns(@dns_mock_object)
+
+    @rr_type           = Resolv::DNS::Resource::IN::SRV
+    @test_srv_hostname = "_puppet._tcp.domain.com" 
+    @test_a_hostname   = "puppet.domain.com"
+    @test_port         = 1000 
+  end
+
+
+  describe "when resolving a host without SRV records" do
+    it "should not yield anything" do
+
+      # No records returned for a DNS entry without any SRV records
+      @dns_mock_object.expects(:getresources).with(@test_a_hostname, 
@rr_type).returns([])
+
+      Puppet::Network::Resolver.by_srv(@test_a_hostname) do |hostname, port, 
remaining|
+        fail_with "host with no records passed block"
+      end
+    end
+  end
+
+  describe "when resolving a host with SRV records" do
+    it "should iterate through records in priority order" do
+      # The records we should use.
+      # priority, weight, port, hostname
+      test_records = [
+        Resolv::DNS::Resource::IN::SRV.new(0, 20,  8140, "puppet1.domain.com"),
+        Resolv::DNS::Resource::IN::SRV.new(0, 100, 8140, "puppet2.domain.com"),
+        Resolv::DNS::Resource::IN::SRV.new(1, 1,   8140, "puppet3.domain.com"),
+        Resolv::DNS::Resource::IN::SRV.new(4, 1,   8140, "puppet4.domain.com")
+      ]
+
+      # The order of the records that should be returned, 
+      # an array means unordered (for weight)
+      order = {
+        0 => ["puppet1.domain.com", "puppet2.domain.com"],
+        1 => ["puppet3.domain.com"],
+        2 => ["puppet4.domain.com"]
+      }
+      
+      @dns_mock_object.expects(:getresources).with(@test_srv_hostname, 
@rr_type).returns(test_records)
+
+      Puppet::Network::Resolver.by_srv(@test_srv_hostname) do |hostname, port|
+        expected_priority = order.keys.min
+
+        order[expected_priority].should include(hostname)
+        port.should_not be(@test_port)
+
+        # Remove the host from our expected hosts
+        order[expected_priority].delete hostname
+
+        # Remove this priority level if we're done with it
+        order.delete expected_priority if order[expected_priority] == []
+      end
+    end
+  end
+end
diff --git a/spec/unit/type/file/content_spec.rb 
b/spec/unit/type/file/content_spec.rb
index cde643f..88ec689 100755
--- a/spec/unit/type/file/content_spec.rb
+++ b/spec/unit/type/file/content_spec.rb
@@ -2,6 +2,8 @@
 
 Dir.chdir(File.dirname(__FILE__)) { (s = lambda { |f| File.exist?(f) ? 
require(f) : Dir.chdir("..") { s.call(f) } }).call("spec/spec_helper.rb") }
 
+require 'puppet/network/resolver'
+
 content = Puppet::Type.type(:file).attrclass(:content)
 describe content do
   before do
@@ -343,11 +345,11 @@ describe content do
       before(:each) do
         @response = stub_everything 'mock response', :code => "404"
         @conn = stub_everything 'connection'
-        @conn.stubs(:request_get).yields(@response)
+        @conn.stubs(:get).returns(@response)
         Puppet::Network::HttpPool.stubs(:http_instance).returns @conn
 
         @content.stubs(:actual_content).returns(nil)
-        @source = stub_everything 'source', :local? => false, :full_path => 
"/path/to/source", :server => "server", :port => 1234
+        @source = stub_everything 'source', :server? => true, :local? => 
false, :full_path => "/path/to/source", :server => "server", :port => 1234
         @resource.stubs(:parameter).with(:source).returns @source
 
         @sum = stub_everything 'sum'
@@ -363,7 +365,7 @@ describe content do
       end
 
       it "should send the correct indirection uri" do
-        @conn.expects(:request_get).with { |uri,headers| uri == 
"/production/file_content/path/to/source" }.yields(@response)
+        @conn.expects(:get).with { |uri,headers| uri == 
"/production/file_content/path/to/source" }.returns(@response)
         @content.write(@fh)
       end
 
-- 
1.7.1

-- 
You received this message because you are subscribed to the Google Groups 
"Puppet Developers" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/puppet-dev?hl=en.

Reply via email to