When a file source property requests a file content, the request is routed (indirected) to P::I::FileContent::Rest. This class instead of relying on the general Rest subsystem to get a raw response containing the full file content, creates a special model using a DeferredResponse. Later when the file type wants to write the file on disk, it "streams" it through the DeferredResponse. The DeferredResponse lazily executes the network requests. But due to the poor Net::HTTP implementation, we're forced to use a new Thread to do the streaming. Each read chunk is passed in a Thread SizedQueue which is then fetch by the main tread doing the on disk store. The new checksum is computed at the same time chunk by chunk.
Signed-off-by: Brice Figureau <[email protected]> --- lib/puppet/file_serving/content_stream.rb | 23 +++ lib/puppet/indirector/file_content/rest.rb | 10 ++- lib/puppet/network/deferred_response.rb | 94 +++++++++++++ lib/puppet/network/formats.rb | 4 + lib/puppet/type/file.rb | 19 +++- spec/integration/network/deferred_response.rb | 34 +++++ spec/unit/file_serving/content_stream.rb | 29 ++++ spec/unit/indirector/file_content/rest.rb | 30 ++++- spec/unit/network/deferred_response.rb | 182 +++++++++++++++++++++++++ spec/unit/network/formats.rb | 4 + spec/unit/type/file.rb | 32 +++++ 11 files changed, 454 insertions(+), 7 deletions(-) create mode 100644 lib/puppet/file_serving/content_stream.rb create mode 100644 lib/puppet/network/deferred_response.rb create mode 100644 spec/integration/network/deferred_response.rb create mode 100644 spec/unit/file_serving/content_stream.rb create mode 100644 spec/unit/network/deferred_response.rb diff --git a/lib/puppet/file_serving/content_stream.rb b/lib/puppet/file_serving/content_stream.rb new file mode 100644 index 0000000..2f0b1a7 --- /dev/null +++ b/lib/puppet/file_serving/content_stream.rb @@ -0,0 +1,23 @@ +# +# Created by Luke Kanies on 2007-10-16. +# Copyright (c) 2007. All rights reserved. + +require 'puppet/indirector' +require 'puppet/file_serving' +require 'puppet/file_serving/content' + +# A class that handles retrieving file contents. +# It only reads the file when its content is specifically +# asked for. +class Puppet::FileServing::ContentStream < Puppet::FileServing::Content + + def self.create(content) + instance = new("/this/is/a/fake/path") + instance.content = content + instance + end + + def self.from_raw(content) + content + end +end diff --git a/lib/puppet/indirector/file_content/rest.rb b/lib/puppet/indirector/file_content/rest.rb index 7b3cade..6da9103 100644 --- a/lib/puppet/indirector/file_content/rest.rb +++ b/lib/puppet/indirector/file_content/rest.rb @@ -2,10 +2,18 @@ # Created by Luke Kanies on 2007-10-18. # Copyright (c) 2007. All rights reserved. -require 'puppet/file_serving/content' +require 'puppet/file_serving/content_stream' require 'puppet/indirector/file_content' require 'puppet/indirector/rest' +require 'puppet/network/deferred_response' class Puppet::Indirector::FileContent::Rest < Puppet::Indirector::REST desc "Retrieve file contents via a REST HTTP interface." + + # let's cheat a little bit: + # instead of returning a "model" we're returning something pretending to be a model + # which instead will fire a deferred request when needed + def find(request) + return Puppet::FileServing::ContentStream.create(Puppet::Network::DeferredResponse.new(request, indirection2uri(request), headers, self)) + end end diff --git a/lib/puppet/network/deferred_response.rb b/lib/puppet/network/deferred_response.rb new file mode 100644 index 0000000..9175e72 --- /dev/null +++ b/lib/puppet/network/deferred_response.rb @@ -0,0 +1,94 @@ +require 'net/http' +require 'thread' + +class Puppet::Network::DeferredResponse + + attr_accessor :request, :uri, :headers, :rest + attr_accessor :response + attr_accessor :request_started + + def initialize(request, uri, headers, rest) + @request = request + @uri = uri + @headers = headers + @rest = rest + @mutex = Mutex.new + @chunk_queue = SizedQueue.new(1) + @got_response = ConditionVariable.new + @chunk = nil + @request_started = false + end + + def stream? + true + end + + def length + start_request + response.content_length + end + + def start_request + # bail out early if we already started the request + return if request_started? + + # launch the network request + Thread.new do + rest.network(request).request_get(uri, headers) do |response| + # we got a response from server + @mutex.synchronize do + @request_started = true + @response = response + @got_response.signal + end + + unless content = rest.deserialize(response) + fail "Could not find any content at %s" % request + end + + stream_response(content.content) + end + end + + # wait for the start of the response to be available + @mutex.synchronize do + @got_response.wait(@mutex) unless @request_started + end + end + + def stream + start_request + while true do + unless chunk = @chunk_queue.pop + # it's the end + break + end + + @checksum.update(chunk) if @checksum + + yield chunk + end + return @checksum + end + + # if we want the stream to be checksummed on the fly + # either set a Digest or a Puppet::Util::ChecksumStream + def checksum=(checksum) + @checksum = checksum + end + + def request_started? + @mutex.synchronize do + return @request_started + end + end + + def stream_response(stream) + stream.response.read_body do |chunk| + # send chunks to our consumer + @chunk_queue << chunk + end + # it's over guys + @chunk_queue << nil + end +end \ No newline at end of file diff --git a/lib/puppet/network/formats.rb b/lib/puppet/network/formats.rb index 8b9b68d..19053ec 100644 --- a/lib/puppet/network/formats.rb +++ b/lib/puppet/network/formats.rb @@ -143,6 +143,10 @@ Puppet::Network::FormatHandler.create(:raw, :mime => "application/x-raw", :weigh raise NotImplementedError end + def support_stream? + true + end + # LAK:NOTE The format system isn't currently flexible enough to handle # what I need to support raw formats just for individual instances (rather # than both individual and collections), but we don't yet have enough data diff --git a/lib/puppet/type/file.rb b/lib/puppet/type/file.rb index 2f5b5df..3d3b8b6 100644 --- a/lib/puppet/type/file.rb +++ b/lib/puppet/type/file.rb @@ -710,7 +710,11 @@ module Puppet if validate = validate_checksum? # Use the appropriate checksum type -- md5, md5lite, etc. sumtype = property(:checksum).checktype - checksum ||= "{#{sumtype}}" + property(:checksum).send(sumtype, content) + if content.is_a?(String) + checksum ||= "{#{sumtype}}" + property(:checksum).send(sumtype, content) + else + content.checksum = property(:checksum).send("#{sumtype}_stream") + end end remove_existing(:file) @@ -729,7 +733,18 @@ module Puppet umask = mode ? 000 : 022 Puppet::Util.withumask(umask) do - File.open(path, File::CREAT|File::WRONLY|File::TRUNC, mode) { |f| f.print content } + File.open(path, File::CREAT|File::WRONLY|File::TRUNC, mode) do |f| + if content.is_a?(String) + f.print content + else # we're in front of a stream + newchecksum = content.stream do |c| + f.print c + end + if validate and newchecksum + checksum ||= "{#{sumtype}}" + newchecksum.checksum + end + end + end end # And put our new file in place diff --git a/spec/integration/network/deferred_response.rb b/spec/integration/network/deferred_response.rb new file mode 100644 index 0000000..01cb6ba --- /dev/null +++ b/spec/integration/network/deferred_response.rb @@ -0,0 +1,34 @@ +#!/usr/bin/env ruby + +require File.dirname(__FILE__) + '/../../spec_helper' + +require 'puppet/network/deferred_response' + +describe Puppet::Network::DeferredResponse do + before(:each) do + @request = stub_everything 'request' + @uri = stub_everything 'uri' + @headers = stub_everything 'headers' + + @request = stub_everything 'request' + @http = stub_everything 'http' + @rest = stub_everything 'rest', :network => @http + + @response = stub_everything 'response' + @http.stubs(:request_get).yields(@response) + + @stream = stub_everything 'stream', :response => @response + @response.stubs(:read_body).multiple_yields("chunk1", "chunk2") + @content = stub_everything 'content', :content => @stream + + @rest.stubs(:deserialize).returns(@content) + + @deferred = Puppet::Network::DeferredResponse.new(@request, @uri, @headers, @rest) + end + + it "should pass chunks to the main thread" do + @deferred.stream do |c| + c.should match /^chunk\d/ + end + end +end \ No newline at end of file diff --git a/spec/unit/file_serving/content_stream.rb b/spec/unit/file_serving/content_stream.rb new file mode 100644 index 0000000..b0d1874 --- /dev/null +++ b/spec/unit/file_serving/content_stream.rb @@ -0,0 +1,29 @@ +#!/usr/bin/env ruby + +require File.dirname(__FILE__) + '/../../spec_helper' + +require 'puppet/file_serving/content_stream' + +describe Puppet::FileServing::ContentStream do + it "should should be a subclass of Content" do + Puppet::FileServing::ContentStream.superclass.should equal(Puppet::FileServing::Content) + end + + it "should be able to create a content instance" do + Puppet::FileServing::ContentStream.should respond_to(:create) + end + + it "should return the content itself when converting from raw" do + content = stub 'content' + Puppet::FileServing::ContentStream.from_raw(content).should == content + end + + it "should create an instance with a fake file name and correct content when converting from raw" do + instance = mock 'instance' + Puppet::FileServing::ContentStream.expects(:new).with("/this/is/a/fake/path").returns instance + + instance.expects(:content=).with "foo/bar" + + Puppet::FileServing::ContentStream.create("foo/bar").should equal(instance) + end +end diff --git a/spec/unit/indirector/file_content/rest.rb b/spec/unit/indirector/file_content/rest.rb index afb674e..4fa0d20 100755 --- a/spec/unit/indirector/file_content/rest.rb +++ b/spec/unit/indirector/file_content/rest.rb @@ -2,10 +2,32 @@ require File.dirname(__FILE__) + '/../../../spec_helper' -require 'puppet/indirector/file_content' +require 'puppet/indirector/file_content/rest' +require 'puppet/file_serving/content' +require 'puppet/file_serving/content_stream' -describe "Puppet::Indirector::Content::Rest" do - it "should add the node's cert name to the arguments" +describe "Puppet::Indirector::FileContent::Rest" do + it "should be a sublcass of Puppet::Indirector::REST" do + Puppet::Indirector::FileContent::Rest.superclass.should equal(Puppet::Indirector::REST) + end - it "should set the content type to text/plain" + describe "when finding" do + before(:each) do + @request = stub_everything 'request' + Puppet::FileServing::Content.terminus_class = :rest + @indirector = Puppet::Indirector::FileContent::Rest.new + @indirector.stubs(:indirection2uri).with(@request).returns("/here") + Puppet::Network::DeferredResponse.stubs(:new) + end + + it "should return a Puppet::FileServing::ContentStream as model" do + @indirector.find(@request).should be_instance_of(Puppet::FileServing::ContentStream) + end + + it "should return a content stream wrapping a deferred response" do + content = stub_everything 'content' + Puppet::Network::DeferredResponse.expects(:new).returns content + @indirector.find(@request).content.should == content + end + end end diff --git a/spec/unit/network/deferred_response.rb b/spec/unit/network/deferred_response.rb new file mode 100644 index 0000000..858229e --- /dev/null +++ b/spec/unit/network/deferred_response.rb @@ -0,0 +1,182 @@ +#!/usr/bin/env ruby + +require File.dirname(__FILE__) + '/../../spec_helper' + +require 'puppet/network/deferred_response' + +describe Puppet::Network::DeferredResponse do + before(:each) do + @request = stub_everything 'request' + @uri = stub_everything 'uri' + @headers = stub_everything 'headers' + + @request = stub_everything 'request' + @http = stub_everything 'http', :request_get => @request + @rest = stub_everything 'rest', :network => @http + + @chunk_queue = stub_everything 'chunk_queue' + SizedQueue.expects(:new).returns(@chunk_queue) + + @got_response = stub_everything 'got_response' + ConditionVariable.stubs(:new).returns(@got_response) + + @mutex = stub_everything 'mutex' + @mutex.stubs(:synchronize).yields + Mutex.expects(:new).returns(@mutex) + + @deferred = Puppet::Network::DeferredResponse.new(@request, @uri, @headers, @rest) + end + + it "should be a 'stream'" do + @deferred.should be_stream + end + + it "should be possible to set a checksum" do + @deferred.should be_respond_to(:checksum=) + end + + describe "when getting length" do + before(:each) do + @response = stub_everything 'response' + @deferred.response = @response + @deferred.stubs(:start_request) + end + + it "should start the network request" do + @deferred.expects(:start_request) + @deferred.length + end + + it "should return the response content length" do + @response.expects(:content_length).returns 10 + @deferred.length.should == 10 + end + end + + describe "when streaming" do + before(:each) do + @deferred.stubs(:start_request) + end + + it "should start the network request" do + @deferred.expects(:start_request) + @deferred.stream + end + + it "should fetch one chunk from the chunk queue" do + @chunk_queue.expects(:pop).twice.returns("chunk", nil) + @deferred.stream { |c| } + end + + it "should fetch all chunks from the chunk queue until nil" do + @chunk_queue.expects(:pop).times(3).returns("chunk1", "chunk2", nil) + @deferred.stream { |c| + c.should match /^chunk\d/ + } + end + + it "should give the chunk to the current checksum stream" do + checksum = stub_everything 'checksum' + @deferred.checksum = checksum + + @chunk_queue.expects(:pop).twice.returns("chunk", nil) + checksum.expects(:update).with("chunk") + + @deferred.stream { |c| } + end + + it "should yield the chunks to the given block" do + @chunk_queue.expects(:pop).twice.returns("chunk", nil) + + @deferred.stream { |c| + c.should == "chunk" + } + end + + it "should return the checksum" do + checksum = stub_everything 'checksum' + @deferred.checksum = checksum + + @deferred.stream.should == checksum + end + end + + describe "when issueing the network request" do + before(:each) do + Thread.stubs(:new).yields(nil) + @stream = stub_everything 'stream' + @content = stub_everything 'content', :content => @stream + @response = stub_everything 'response' + @content.stubs(:response).returns(@response) + end + + it "should return early if the request has already been started" do + @deferred.expects(:request_started?).returns(true) + @rest.expects(:network).never + @deferred.start_request + end + + it "should launch the request in a new Thread" do + Thread.expects(:new).yields + @rest.expects(:network).with(@request).returns(@http) + @http.expects(:request_get).with(@uri, @headers) + @deferred.start_request + end + + describe "and the network thread" do + before(:each) do + Thread.expects(:new).yields + @rest.stubs(:network).with(@request).returns(@http) + @rest.stubs(:deserialize).returns(@content) + @http.stubs(:request_get).with(@uri, @headers).yields(@response) + @deferred.stubs(:stream_response) + end + + it "should let everyone know the request has started" do + @deferred.start_request + @deferred.request_started?.should be true + end + + it "should signal other threads that the response is ready" do + @got_response.expects(:signal) + @deferred.start_request + end + + it "should store the current response" do + @deferred.start_request + @deferred.response.should == @response + end + + it "should deserialize the response" do + @rest.expects(:deserialize).with(@response).returns(@content) + @deferred.start_request + end + + it "should stream the response content" do + @deferred.expects(:stream_response).with(@stream) + @deferred.start_request + end + end + + describe "and the other thread" do + it "should finally wait the request to be started" do + @got_response.expects(:wait) + @deferred.start_request + end + end + + describe "when streaming the response content" do + it "should put each chunk in the chunk queue" do + @response.expects(:read_body).multiple_yields("chunk1","chunk2") + @chunk_queue.expects(:<<).with("chunk1").then.with("chunk2") + @deferred.stream_response(@content) + end + + it "should enqueue nil at the end" do + @chunk_queue.expects(:<<).with(nil) + @deferred.stream_response(@content) + end + end + end + +end \ No newline at end of file diff --git a/spec/unit/network/formats.rb b/spec/unit/network/formats.rb index 208c6f4..c1f74f9 100755 --- a/spec/unit/network/formats.rb +++ b/spec/unit/network/formats.rb @@ -280,6 +280,10 @@ describe "Puppet Network Format" do @format.should be_supported(String) end + it "should always support streaming" do + @format.should be_support_stream + end + it "should fail if its multiple_render method is used" do lambda { @format.render_multiple("foo") }.should raise_error(NotImplementedError) end diff --git a/spec/unit/type/file.rb b/spec/unit/type/file.rb index 1b3fe6a..49c1d34 100755 --- a/spec/unit/type/file.rb +++ b/spec/unit/type/file.rb @@ -774,5 +774,37 @@ describe Puppet::Type.type(:file) do lambda { file.write("something", :content) }.should raise_error(Puppet::Error) end + + it "should stream streamable content" do + file = Puppet::Type::File.new(:name => "/my/file", :backup => "puppet") + f = stub_everything 'file' + File.stubs(:open).yields(f) + File.stubs(:rename) + content = stub_everything 'content' + + content.expects(:stream).yields("chunk") + f.expects(:print).with("chunk") + + file.write(content, :content) + end + + it "should use the streamed checksum" do + file = Puppet::Type::File.new(:name => "/my/file", :backup => "puppet") + f = stub_everything 'file' + File.stubs(:open).yields(f) + File.stubs(:rename) + file.stubs(:validate_checksum?).returns(true) + file.stubs(:fail_if_checksum_is_wrong) + content = stub_everything 'content' + property = stub_everything 'property' + file.stubs(:property).returns property + property.stubs(:checktype).returns(:md5) + checksum = stub_everything 'checksum', :checksum => "DEADBEEF" + content.stubs(:stream).returns(checksum) + + file.expects(:setchecksum).with("{md5}DEADBEEF") + + file.write(content, :content) + end end end -- 1.6.6.1 -- You received this message because you are subscribed to the Google Groups "Puppet Developers" group. To post to this group, send email to [email protected]. To unsubscribe from this group, send email to [email protected]. For more options, visit this group at http://groups.google.com/group/puppet-dev?hl=en.
