The queue abstract terminus allows the standard indirector behaviors to 
interact with a message queue broker, such that the indirector's "save" method 
writes the relevant model object out to a queue on the message broker.  While 
the indirector's "find" method does not map to a message queue, the queue 
terminus class offers a "subscribe" method that allows for easy implementation 
of an event loop, receiving indirected objects saved to a queue as they come in.

Signed-off-by: Ethan Rowe <[email protected]>
---
 lib/puppet/indirector/queue.rb |   78 +++++++++++++++++++++++++++++++++++
 spec/unit/indirector/queue.rb  |   87 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 165 insertions(+), 0 deletions(-)
 create mode 100644 lib/puppet/indirector/queue.rb
 create mode 100755 spec/unit/indirector/queue.rb

diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/queue.rb
new file mode 100644
index 0000000..c58af98
--- /dev/null
+++ b/lib/puppet/indirector/queue.rb
@@ -0,0 +1,78 @@
+require 'puppet/indirector/terminus'
+require 'puppet/util/queue'
+require 'yaml'
+
+# Implements the <tt>:queue</tt> abstract indirector terminus type, for storing
+# model instances to a message queue, presumably for the purpose of 
out-of-process
+# handling of changes related to the model.
+#
+# Relies upon Puppet::Util::Queue for registry and client object management,
+# and specifies a default queue type of <tt>:stomp</tt>, appropriate for use 
with a variety of message brokers.
+#
+# It's up to the queue client type to instantiate itself correctly based on 
Puppet configuration information.
+# 
+# A single queue client is maintained for the abstract terminus, meaning that 
you can only use one type
+# of queue client, one message broker solution, etc., with the indirection 
mechanism.
+#
+# Per-indirection queues are assumed, based on the indirection name.  If the 
<tt>:catalog</tt> indirection makes
+# use of this <tt>:queue</tt> terminus, queue operations work against the 
"catalog" queue.  It is up to the queue
+# client library to handle queue creation as necessary (for a number of 
popular queuing solutions, queue
+# creation is automatic and not a concern).
+class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
+    extend ::Puppet::Util::Queue
+    self.queue_type_default = :stomp
+
+    # Queue has no idiomatic "find"
+    def find(request)
+        nil
+    end
+
+    # Place the request on the queue
+    def save(request)
+        begin
+            Puppet.info "Queueing catalog for %s" % request.key
+            client.send_message(queue, render(request.instance))
+        rescue => detail
+            raise Puppet::Error, "Could not write %s to queue: 
%s\nInstance::%s\n client : %s" % [request.key, 
detail,request.instance.to_s,client.to_s]
+        end
+    end
+
+    def self.queue
+        indirection_name
+    end
+
+    def queue
+        self.class.queue
+    end
+
+    # Returns the singleton queue client object.
+    def client
+        self.class.client
+    end
+
+    # Formats the model instance associated with _request_ appropriately for 
message delivery.
+    # Uses YAML serialization.
+    def render(obj)
+        YAML::dump(obj)
+    end
+
+    # converts the _message_ from deserialized format to an actual model 
instance.
+    def self.intern(message)
+        YAML::load(message)
+    end
+
+    # Provides queue subscription functionality; for a given indirection, use 
this method on the terminus
+    # to subscribe to the indirection-specific queue.  Your _block_ will be 
executed per new indirection
+    # model received from the queue, with _obj_ being the model instance.
+    def self.subscribe
+        client.subscribe(queue) do |msg|
+            begin
+                yield(self.intern(msg))
+            rescue => detail
+                # really, this should log the exception rather than raise it 
all the way up the stack;
+                # we don't want exceptions resulting from a single message 
bringing down a listener
+                raise Puppet::Error, "Error occured with subscription to queue 
%s for indirection %s: %s" % [queue, indirection_name, detail]
+            end
+        end
+    end
+end
diff --git a/spec/unit/indirector/queue.rb b/spec/unit/indirector/queue.rb
new file mode 100755
index 0000000..de9a27f
--- /dev/null
+++ b/spec/unit/indirector/queue.rb
@@ -0,0 +1,87 @@
+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+require 'puppet/indirector/queue'
+
+class Puppet::Indirector::Queue::TestClient
+    def self.reset
+        @queues = {}
+    end
+
+    def self.queues
+        @queues ||= {}
+    end
+
+    def subscribe(queue)
+        stack = self.class.queues[queue] ||= []
+        while stack.length > 0 do
+            yield(stack.shift)
+        end
+    end
+
+    def send_message(queue, message)
+        stack = self.class.queues[queue] ||= []
+        stack.push(message)
+        queue
+    end
+end
+
+class FooExampleData
+    attr_accessor :name
+end
+
+describe Puppet::Indirector::Queue do
+    before :each do
+        @indirection = stub 'indirection', :name => :my_queue, 
:register_terminus_type => nil
+        
Puppet::Indirector::Indirection.stubs(:instance).with(:my_queue).returns(@indirection)
+        @store_class = Class.new(Puppet::Indirector::Queue) do
+            def self.to_s
+                'MyQueue::MyType'
+            end
+        end
+        @store = @store_class.new
+
+        @subject_class = FooExampleData
+        @subject = @subject_class.new
+        @subject.name = :me
+
+        Puppet.settings.stubs(:value).returns("bogus setting data")
+        Puppet.settings.stubs(:value).with(:queue_client).returns(:test_client)
+        
Puppet::Util::Queue.stubs(:queue_type_to_class).with(:test_client).returns(Puppet::Indirector::Queue::TestClient)
+        Puppet::Indirector::Queue::TestClient.reset
+
+        @request = stub 'request', :key => :me, :instance => @subject
+    end
+
+    it 'should use the correct client type and queue' do
+        @store.queue.should == :my_queue
+        @store.client.should 
be_an_instance_of(Puppet::Indirector::Queue::TestClient)
+    end
+
+    it 'should use render() to convert object to message' do
+        @store.expects(:render).with(@subject).once
+        @store.save(@request)
+    end
+
+    it 'should save and restore with the appropriate queue, and handle 
subscribe block' do
+        @subject_two = @subject_class.new
+        @subject_two.name = :too
+        @store.save(@request)
+        @store.save(stub('request_two', :key => 'too', :instance => 
@subject_two))
+
+        received = []
+        @store_class.subscribe do |obj|
+            received.push(obj)
+        end
+
+        received[0].name.should == @subject.name
+        received[1].name.should == @subject_two.name
+    end
+
+    it 'should use intern() to convert message to object with subscribe()' do
+        @store.save(@request)
+        @store_class.expects(:intern).with(@store.render(@subject)).once
+        @store_class.subscribe {|o| o }
+    end
+end
+
-- 
1.5.5.1


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Puppet Developers" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/puppet-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to