On Jun 8, 2011, at 7:09 PM, [email protected] wrote: ACK.
A minor stylistic comments inline. -- Michal > From: marios <[email protected]> > > > Signed-off-by: marios <[email protected]> > --- > server/lib/deltacloud/drivers/ec2/ec2_driver.rb | 37 ++++++- > server/lib/deltacloud/helpers/blob_stream.rb | 142 ++++++++++++++++++++++- > server/server.rb | 51 +++++++-- > server/views/blobs/new.html.haml | 14 +- > 4 files changed, 220 insertions(+), 24 deletions(-) > > diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb > b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb > index 4edd989..d37e8ab 100644 > --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb > +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb > @@ -396,7 +396,7 @@ module Deltacloud > end > > #-- > - # Create Blob > + # Create Blob - NON Streaming way (i.e. was called with POST html > multipart form data) > #-- > def create_blob(credentials, bucket_id, blob_id, data = nil, opts = > {}) > s3_client = new_client(credentials, :s3) > @@ -445,6 +445,40 @@ module Deltacloud > end > end > > + #params: > {:user,:password,:bucket,:blob,:content_type,:content_length,:metadata} > + def blob_stream_connection(params) > + #canonicalise metadata: > + > #http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html > + metadata = params[:metadata] > + signature_meta_string = "" > + unless metadata.nil? > + metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', > 'x-amz-meta-') > + keys_array = metadata.keys.sort! > + keys_array.each {|k| signature_meta_string << > "#{k}:#{metadata[k]}\n"} > + end > + #s3.amazonaws.com > + provider = > "https://#{Deltacloud::Drivers::driver_config[:ec2][:entrypoints]['s3']['us-east-1']}" How about making this configurable by headers o matrix params? Like I want to use different provider than us-east-1 ? > + uri = URI.parse(provider) > + http = Net::HTTP.new("#{params[:bucket]}.#{uri.host}", uri.port ) > + http.use_ssl = true > + http.verify_mode = OpenSSL::SSL::VERIFY_NONE > + timestamp = Time.now.httpdate > + string_to_sign = > + > "PUT\n\n#{params[:content_type]}\n#{timestamp}\n#{signature_meta_string}/#{params[:bucket]}/#{params[:blob]}" > + auth_string = Aws::Utils::sign(params[:password], string_to_sign) > + request = Net::HTTP::Put.new("/#{params[:blob]}") > + request['Host'] = "#{params[:bucket]}.#{uri.host}" > + request['Date'] = timestamp > + request['Content-Type'] = params[:content_type] > + request['Content-Length'] = params[:content_length] > + request['Authorization'] = "AWS #{params[:user]}:#{auth_string}" > + request['Expect'] = "100-continue" > + unless metadata.nil? > + metadata.each{|k,v| request[k] = v} > + end > + return http, request > + end > + > def storage_volumes(credentials, opts={}) > ec2 = new_client( credentials ) > volume_list = (opts and opts[:id]) ? opts[:id] : nil > @@ -582,7 +616,6 @@ module Deltacloud > end > > private > - > def new_client(credentials, type = :ec2) > klass = case type > when :elb then Aws::Elb > diff --git a/server/lib/deltacloud/helpers/blob_stream.rb > b/server/lib/deltacloud/helpers/blob_stream.rb > index 00879a9..094bfe6 100644 > --- a/server/lib/deltacloud/helpers/blob_stream.rb > +++ b/server/lib/deltacloud/helpers/blob_stream.rb > @@ -69,13 +69,145 @@ class Hash > remove = [] > self.each_key do |key| > if key.to_s.match(rgx_pattern) > - new_key = key.to_s.gsub(rgx_pattern, replacement) > + new_key = key.to_s.gsub(rgx_pattern, replacement).downcase > self[new_key] = self[key] > remove << key > - end #key.match > - end # each_key do > + end > + end > #remove the original keys > self.delete_if{|k,v| remove.include?(k)} > - end #def > + end > + > +end > + > +#Monkey patch for streaming blobs: > +# Normally a client will upload a blob to deltacloud and thin will put > +# this into a tempfile. Then deltacloud would stream up to the provider: > +# i.e. client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider > +# Instead we want to recognise that this is a 'PUT blob' operation and > +# start streaming to the provider as the request is received: > +# i.e. client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider > +module Thin > + class Request > + > + alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if > defined?(Thin::Response) > + private > + def move_body_to_tempfile > + if BlobStreamIO::is_put_blob(self) > + @body = BlobStreamIO.new(self) > + else > + move_body_to_tempfile_orig > + end > + end > + > + end > +end > + > +require 'net/http' > +#monkey patch for Net:HTTP > +module Net > + class HTTP > + > + alias :request_orig :request > + > + def request(req, body = nil, blob_stream = nil, &block) > + unless blob_stream > + return request_orig(req, body, &block) > + end > + @blob_req = req > + do_start #start the connection > + > + req.set_body_internal body > + begin_transport req > + req.write_header_m @socket,@curr_http_version, edit_path(req.path) > + @socket > + end > + > + class Put < HTTPRequest > + def write_header_m(sock, ver, path) > + write_header(sock, ver, path) > + end > + end > + > + def end_request > + begin > + res = HTTPResponse.read_new(@socket) > + end while res.kind_of?(HTTPContinue) > + res.reading_body(@socket, @blob_req.response_body_permitted?) { > + yield res if block_given? } > + end_transport @blob_req, res > + do_finish > + res > + end > + end > + > +end > + > +require 'base64' > +class BlobStreamIO > + > + attr_accessor :size, :provider, :sock > + > + def initialize(request) > + @client_request = request > + @size = 0 > + bucket, blob = parse_bucket_blob(request.env["PATH_INFO"]) > + user, password = parse_credentials(request.env['HTTP_AUTHORIZATION']) > + content_type = request.env['CONTENT_TYPE'] || "" > + #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value) > + meta_array = request.env.select{|k,v| > k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)} > + user_meta = meta_array.inject({}){ |result, array| > result[array.first.upcase] = array.last; result} > + @content_length = request.env['CONTENT_LENGTH'] > + @http, provider_request = driver.blob_stream_connection({:user=>user, > + :password=>password, :bucket=>bucket, :blob=>blob, :metadata=> > user_meta, > + :content_type=>content_type, :content_length=>@content_length }) > + @content_length = @content_length.to_i #for comparison of size in '<< > (data)' > + @sock = @http.request(provider_request, nil, true) > + end > + > + def << (data) > + @sock.write(data) > + @size += data.length > + if (@size >= @content_length) > + result = @http.end_request > + if result.is_a?(Net::HTTPSuccess) > + @client_request.env["BLOB_SUCCESS"] = "true" > + else > + @client_request.env["BLOB_FAIL"] = result.body > + end > + end > + end > > -end #class > + def rewind > + end > + > + #use the Request.env hash (populated by the ThinParser) to determine > whether > + #this is a post blob operation. By definition, only get here with a body of > + # > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32) > + def self.is_put_blob(request = nil) > + path = request.env['PATH_INFO'] > + method = request.env['REQUEST_METHOD'] > + if ( path =~ > /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/ && > method == 'PUT' ) > + return true > + else > + return false > + end The 'if' construction here is not necessary, since condition will return true/false anyway: ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/ && method == 'PUT' ) > + end > + > + private > + > + def parse_bucket_blob(request_string) > + array = request_string.split("/") > + blob = array.pop > + bucket = array.pop > + return bucket, blob > + end > + > + def parse_credentials(request_string) > + decoded = Base64.decode64(request_string.split('Basic ').last) > + key = decoded.split(':').first > + pass = decoded.split(':').last > + return key, pass > + end > + > +end > diff --git a/server/server.rb b/server/server.rb > index e332679..32d0e33 100644 > --- a/server/server.rb > +++ b/server/server.rb > @@ -696,13 +696,47 @@ get > "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/new_blob" do > end > end > > -#create a new blob > +#create a new blob using PUT - streams through deltacloud > +put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do > + if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob > + content_type = env["CONTENT_TYPE"] > + content_type ||= "" > + @blob = driver.blob(credentials, {:id => params[:blob], > + 'bucket' => params[:bucket]}) It's better to use Symbol as Hash key instead of String. > + respond_to do |format| > + format.html { haml :"blobs/show" } > + format.xml { haml :"blobs/show" } > + format.json { convert_to_json(:blobs, @blob) } > + end > + elsif(env["BLOB_FAIL"]) > + report_error(500) #OK? > + else # small blobs - < 112kb dont hit the streaming monkey patch - use > 'normal' create_blob > + # also, if running under webrick don't hit the streaming patch (Thin > specific) > + bucket_id = params[:bucket] > + blob_id = params[:blob] > + temp_file = Tempfile.new("temp_blob_file") > + temp_file.write(env['rack.input'].read) > + temp_file.flush > + content_type = env['CONTENT_TYPE'] || "" > + blob_data = {:tempfile => temp_file, :type => content_type} > + meta_array = request.env.select{|k,v| > k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)} > + user_meta = meta_array.inject({}){ |result, array| > result[array.first.upcase] = array.last; result} > + @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, > user_meta) > + temp_file.delete > + respond_to do |format| > + format.html { haml :"blobs/show"} > + format.xml { haml :"blobs/show" } > + end > + end > +end > + > +#create a new blob using html interface - NON STREAMING (i.e. browser POST > http form data) > post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do > bucket_id = params[:bucket] > - blob_id = params['blob_id'] > + blob_id = params['blob'] > blob_data = params['blob_data'] > user_meta = {} > -#first try get blob_metadata from params (i.e., passed by http form post, > e.g. browser) > + #metadata from params (i.e., passed by http form post, e.g. browser) > max = params[:meta_params] > if(max) > (1..max.to_i).each do |i| > @@ -710,11 +744,8 @@ post > "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do > key = "HTTP_X_Deltacloud_Blobmeta_#{key}" > value = params[:"meta_value#{i}"] > user_meta[key] = value > - end #max.each do > - else #can try to get blob_metadata from http headers > - meta_array = request.env.select{|k,v| > k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)} > - meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = > array.last} > - end #end if > + end > + end > @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, > user_meta) > respond_to do |format| > format.html { haml :"blobs/show"} > @@ -730,7 +761,7 @@ delete > "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do > respond_to do |format| > format.xml { 204 } > format.json { 204 } > - format.html { bucket_url(bucket_id) } > + format.html { redirect(bucket_url(bucket_id)) } > end > end > > @@ -768,7 +799,7 @@ get > "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do > respond_to do |format| > format.html { haml :"blobs/show" } > format.xml { haml :"blobs/show" } > - format.json { convert_to_json(blobs, @blob) } > + format.json { convert_to_json(:blobs, @blob) } > end > else > report_error(404) > diff --git a/server/views/blobs/new.html.haml > b/server/views/blobs/new.html.haml > index a075f0a..bf5c6f5 100644 > --- a/server/views/blobs/new.html.haml > +++ b/server/views/blobs/new.html.haml > @@ -3,13 +3,7 @@ > %form{ :action => bucket_url(@bucket_id), :method => :post, :enctype => > 'multipart/form-data'} > %label > Blob Name: > - %input{ :name => 'blob_id', :size => 512}/ > - %label > - Blob Data: > - %br > - %input{ :type => "file", :name => 'blob_data', :size => 50}/ > - %br > - %br > + %input{ :name => 'blob', :size => 512}/ > %input{ :type => "hidden", :name => "meta_params", :value => "0"} > %a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata > %div{ :id => "metadata_holder", :style => "display: none;"} > @@ -23,4 +17,10 @@ > %a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata > %br > %br > + %label > + Blob Data: > + %br > + %input{ :type => "file", :name => 'blob_data', :size => 50}/ > + %br > + %br > %input{ :type => :submit, :name => "commit", :value => "create"}/ > -- > 1.7.3.4 > ------------------------------------------------------ Michal Fojtik, [email protected] Deltacloud API: http://deltacloud.org
