Ok, thanks for the review - I am sending out a new patch series. I
address your comments inline:
On 08/06/11 11:52, David Lutterkort wrote:
On Tue, 2011-06-07 at 20:45 +0300, [email protected] wrote:
From: marios<[email protected]>
Signed-off-by: marios<[email protected]>
Almost ACK; there's a few stylistic things that should be done to
simplify the code:
---
server/lib/deltacloud/drivers/ec2/ec2_driver.rb | 51 +++++++-
server/lib/deltacloud/helpers/blob_stream.rb | 155 +++++++++++++++++++++--
server/server.rb | 56 +++++++--
server/views/blobs/new.html.haml | 14 +-
4 files changed, 242 insertions(+), 34 deletions(-)
diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
index 4edd989..7f1e021 100644
--- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
+++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
@@ -355,9 +355,9 @@ module Deltacloud
bucket_list = s3_client.buckets
bucket_list.each do |current|
buckets<< Bucket.new({:name => current.name, :id =>
current.name})
- end #bucket_list.each
- end #if
- end #safely
+ end
+ end
+ end
filter_on(buckets, :id, opts)
end
@@ -387,8 +387,12 @@ module Deltacloud
blobs = []
safely do
s3_bucket = s3_client.bucket(opts['bucket'])
- s3_bucket.keys({}, true).each do |s3_object|
- blobs<< convert_object(s3_object)
+ unless(opts[:id]).nil?
Better: 'if opts[:id]'
+ blobs<< convert_object(s3_bucket.key(opts[:id], true))
+ else
+ s3_bucket.keys({}, true).each do |s3_object|
+ blobs<< convert_object(s3_object)
+ end
end
end
blobs = filter_on(blobs, :id, opts)
This and the previous hunk really belong into a different patch.
===> OK, patches 2 and 3 in the new series
@@ -396,7 +400,7 @@ module Deltacloud
end
#--
- # Create Blob
+ # Create Blob - NON Streaming way (i.e. was called with POST html
multipart form data)
#--
def create_blob(credentials, bucket_id, blob_id, data = nil, opts =
{})
s3_client = new_client(credentials, :s3)
@@ -445,6 +449,40 @@ module Deltacloud
end
end
+ #params: {'user','password','bucket','blob','content_type',
'content_length', 'metadata'}
+ def blob_stream_connection(params)
+ #canonicalise metadata:
+
#http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
+ metadata = params['metadata']
+ signature_meta_string = ""
+ unless metadata.nil?
+ metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]',
'x-amz-meta-')
+ keys_array = metadata.keys.sort!
+ keys_array.each {|k| signature_meta_string<<
"#{k}:#{metadata[k]}\n"}
+ end
+ #s3.amazonaws.com
+ provider =
"https://#{Deltacloud::Drivers::driver_config[:ec2][:entrypoints]['s3']['us-east-1']}"
This should use endpoint_for_service so that provider selection works as
expected.
===> problem is that we really MUST use 'https://s3.amazonaws.com' (i.e.
the us-east endpoint) for putting blobs. Amazon redirects your put to
the appropriate endpoint depending on your bucket location (e.g. to
s3-eu-west-1.amazonaws.com for European buckets). I tried putting
directly to the European endpoint but failed. The 'endpoint_for_service'
method checks
"(Thread.current[:provider] || ENV['API_PROVIDER'] || DEFAULT_REGION) "
for the endpoint.
+ uri = URI.parse(provider)
+ http = Net::HTTP.new("#{params['bucket']}.#{uri.host}", uri.port )
+ http.use_ssl = true
+ http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ timestamp = Time.now.httpdate
+ string_to_sign =
+
"PUT\n\n#{params['content_type']}\n#{timestamp}\n#{signature_meta_string}/#{params['bucket']}/#{params['blob']}"
+ auth_string = Aws::Utils::sign(params['password'], string_to_sign)
+ request = Net::HTTP::Put.new("/#{params['blob']}")
+ request['Host'] = "#{params['bucket']}.#{uri.host}"
+ request['Date'] = timestamp
+ request['Content-Type'] = params['content_type']
+ request['Content-Length'] = params['content_length']
+ request['Authorization'] = "AWS #{params['user']}:#{auth_string}"
+ request['Expect'] = "100-continue"
+ unless metadata.nil?
+ metadata.each{|k,v| request[k] = v}
+ end
+ return http, request
+ end
+
def storage_volumes(credentials, opts={})
ec2 = new_client( credentials )
volume_list = (opts and opts[:id]) ? opts[:id] : nil
@@ -582,7 +620,6 @@ module Deltacloud
end
private
-
def new_client(credentials, type = :ec2)
klass = case type
when :elb then Aws::Elb
diff --git a/server/lib/deltacloud/helpers/blob_stream.rb
b/server/lib/deltacloud/helpers/blob_stream.rb
index 00879a9..355a66a 100644
--- a/server/lib/deltacloud/helpers/blob_stream.rb
+++ b/server/lib/deltacloud/helpers/blob_stream.rb
@@ -17,7 +17,8 @@ include Deltacloud
begin
require 'eventmachine'
#--
- # based on the example from
http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
+ # based on the example from
+ # http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
#--
class BlobStream
AsyncResponse = [-1, {}, []].freeze
@@ -32,11 +33,11 @@ begin
'Content-Disposition' => params["content_disposition"],
'Content-Length' => "#{params['content_length']}"}, body]
}
- #call the driver from here. the driver method yields for every chunk of
blob it receives. We then
- #use body.call to write that chunk as received.
+ #call the driver from here. the driver method yields for every chunk
+ #of blob it receives. Then use body.call to write that chunk as received.
driver.blob_data(credentials, params[:bucket], params[:blob], params) {|chunk|
body.call ["#{chunk}"]} #close blob_data block
body.succeed
- AsyncResponse # Tells Thin to not close the connection and continue it's
work on other request
+ AsyncResponse # Tell Thin to not close connection& work other requests
end
end
The last two hunks only change formatting and should really go into a
separate patch.
==>OK patch 3 in the new series
@@ -69,13 +70,149 @@ class Hash
remove = []
self.each_key do |key|
if key.to_s.match(rgx_pattern)
- new_key = key.to_s.gsub(rgx_pattern, replacement)
+ new_key = key.to_s.gsub(rgx_pattern, replacement).downcase
self[new_key] = self[key]
remove<< key
- end #key.match
- end # each_key do
+ end
+ end
#remove the original keys
self.delete_if{|k,v| remove.include?(k)}
- end #def
+ end
+
+end
+
+#Monkey patch for streaming blobs:
+# Normally a client will upload a blob to deltacloud and thin will put
+# this into a tempfile. Then deltacloud would stream up to the provider:
+# i.e. client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider
+# Instead we want to recognise that this is a 'PUT blob' operation and
+# start streaming to the provider as the request is received:
+# i.e. client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider
+module Thin
+ class Request
+
+ alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if
defined?(Thin::Response)
Why dies this need to be made conditional ? If we are not running under
thin, it doesn't really matter what move_body_to_tempfile does, since
e.g. Webrick will never call it.
===> Basically I uninstalled thin entirely (i.e. the thin gem) in order
to test this code under webrick. In that case, the 'alias_method' failed
since Thin::Request.move_body_to_tempfile does not exist
(./server/bin/../lib/deltacloud/helpers/blob_stream.rb:94:in
`alias_method': undefined method `move_body_to_tempfile' for class
`Thin::Request' (NameError)).
+ private
+ def move_body_to_tempfile
+ if BlobStreamIO::is_put_blob(self)
+ @body = BlobStreamIO.new(self)
+ else
+ move_body_to_tempfile_orig
+ end
+ end
+
+ end
+end
+
+require 'net/http'
+#monkey patch for Net:HTTP
+module Net
+ class HTTP
-end #class
+ alias :request_orig :request
+
+ @blob_req # needs global scope for close op later
That line isn't needed.
done (sorry bad habits from my java days... )
+ def request(req, body = nil, blob_stream = nil,&block)
+ unless blob_stream
+ return request_orig(req, body,&block)
+ end
+ @blob_req = req
+ do_start #start the connection
+
+ req.set_body_internal body
+ begin_transport req
+ req.write_header_m @socket,@curr_http_version, edit_path(req.path)
+ @socket
+ end
+
+ class Put< HTTPRequest
+ def write_header_m(sock, ver, path)
+ write_header(sock, ver, path)
+ end
+ end
+
+ def end_request
+ begin
+ res = HTTPResponse.read_new(@socket)
+ end while res.kind_of?(HTTPContinue)
+ res.reading_body(@socket, @blob_req.response_body_permitted?) {
+ yield res if block_given?
+ }
+ end_transport @blob_req, res
Spaces, please
ok done
+ do_finish
+ res
+ end
+ end
+
+end
+
+require 'base64'
+class BlobStreamIO
+
+ attr_accessor :size, :provider, :sock
+
+ def initialize(request)
+ @client_request = request
+ @size = 0
+ bucket, blob = parse_bucket_blob(request.env["PATH_INFO"])
+ user, password = parse_credentials(request.env['HTTP_AUTHORIZATION'])
+ content_type = request.env['CONTENT_TYPE'] || ""
+ #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value)
+ user_meta = {}
+ meta_array = request.env.select{|k,v|
k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+ meta_array.inject({}){ |result, array| user_meta[array.first.upcase] =
array.last}
The inject here doesn't do anything, since you never change result, and
never use the return value. Either use each or sth like
user_meta = meta_array.inject({}){ |res, kv| res[kv.first.upcase] =
kv.last; res }
thanks, done
Also, there's a lot of blob meta handling sprinkled through the code
(here, ec2 driver etc.) Not for this patch, but in general, it would be
cleaner to have some helper methods to deal with that. For example, a
helper method 'extract_blob_meta(headers, opts)' that does the above,
and takes an option to rename keys, so that in the ec2_driver you can
call
headers = extract_blob_meta(request.env, :rename =>
"x-amz-meta-")
ok, I created a new BlobHelper module in blob_stream.rb, patch 4 in the
new series
+ @content_length = request.env['CONTENT_LENGTH']
+ @http, provider_request = driver.blob_stream_connection({'user'=>user,
+ 'password'=>password, 'bucket'=>bucket, 'blob'=>blob, 'metadata'=>
user_meta,
+ 'content_type'=>content_type, 'content_length'=>@content_length })
Minor stylistic nit: it looks a litle funky to use string keys here
instead of symbols.
done
+ @content_length = @content_length.to_i #for comparison of size in '<<
(data)'
+ @sock = @http.request(provider_request, nil, true)
+ end
+
+ def<< (data)
+ @sock.write(data)
+ @size += data.length
+ if (@size>= @content_length)
+ result = @http.end_request
+ if result.is_a?(Net::HTTPSuccess)
+ @client_request.env["BLOB_SUCCESS"] = "true"
+ else
+ @client_request.env["BLOB_FAIL"] = result.body
+ end
+ end
+ end
+
+ def rewind
+ end
+
+ #use the Request.env hash (populated by the ThinParser) to determine whether
+ #this is a post blob operation. By definition, only get here with a body of
+ #> 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
+ def self.is_put_blob(request = nil)
+ path = request.env['PATH_INFO']
+ method = request.env['REQUEST_METHOD']
+ if ( path =~
/^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/&&
method == 'PUT' )
+ return true
+ else
+ return false
+ end
+ end
+
+ private
+
+ def parse_bucket_blob(request_string)
+ array = request_string.split("/")
+ blob = array.pop
+ bucket = array.pop
+ return bucket, blob
+ end
+
+ def parse_credentials(request_string)
+ decoded = Base64.decode64(request_string.split('Basic ').last)
+ key = decoded.split(':').first
+ pass = decoded.split(':').last
+ return key, pass
+ end
+
+end
diff --git a/server/server.rb b/server/server.rb
index e332679..a8f5ee0 100644
--- a/server/server.rb
+++ b/server/server.rb
@@ -696,13 +696,46 @@ get
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/new_blob" do
end
end
-#create a new blob
+#create a new blob using PUT - streams through deltacloud
+put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
+ if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
+ content_type = env["CONTENT_TYPE"]
+ content_type ||= ""
+ @blob = driver.blob(credentials, {:id => params[:blob],
+ 'bucket' => params[:bucket]})
+ respond_to do |format|
+ format.html { haml :"blobs/show" }
+ format.xml { haml :"blobs/show" }
+ format.json { convert_to_json(:blobs, @blob) }
+ end
+ elsif(env["BLOB_FAIL"])
+ report_error(500) #OK?
+ else # small blobs -< 112kb dont hit the streaming monkey patch - redirect
to POST rule
+ # also, if running under webrick don't hit the streaming patch (Thin
specific)
+ path = bucket_url(params[:bucket])
+ status, headers, body = call! env.merge({"PATH_INFO" =>
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/#{params[:bucket]}",
+ "REQUEST_METHOD" => "POST",
+ "DELTACLOUD_BLOB_NAME" =>
params[:blob]})
Could we avoid calling the POST rule if we factored the things that both
PUT and POST need into a helper method ?
===> unfortunately no. The two differ in the way they get the params for
the driver.create_blob operation: how they get the metadata (http form
post vs http headers), how they get the blob data (http form post, vs
data given in rack.input which we have to put into a tempfile). The
commonality between the two methods is just the call to
'driver.create_blob' once the params are determined. However, to avoid
the call to POST from PUT (and hence the requirement for sinatra 1.2.4)
I simply put the code directly into the PUT method.
+ end
+end
+
+#create a new blob using html interface - NON STREAMING (i.e. browser POST
http form data)
post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
bucket_id = params[:bucket]
- blob_id = params['blob_id']
- blob_data = params['blob_data']
+ #check if we were passed here from the PUT method - with a small blob
(<112kb)
+ if(env['DELTACLOUD_BLOB_NAME'])
+ blob_id = env['DELTACLOUD_BLOB_NAME']
+ temp_file = Tempfile.new("temp_blob_file")
+ temp_file.write(env['rack.input'].read)
+ temp_file.flush
+ content_type = env['CONTENT_TYPE'] || ""
+ blob_data = {:tempfile => temp_file, :type => content_type}
+ else
+ blob_id = params['blob']
+ blob_data = params['blob_data']
+ end
This seems too complicated to me ... if we have to jump through hoops to
get to the POST handler, and then through more hoops to have the POST
handler detect we are coming from PUT, we really should be splitting
stuff out into helper method(s)
as previous comment
user_meta = {}
-#first try get blob_metadata from params (i.e., passed by http form post, e.g.
browser)
+ #first try metadata from params (i.e., passed by http form post, e.g.
browser)
max = params[:meta_params]
if(max)
(1..max.to_i).each do |i|
@@ -710,12 +743,13 @@ post
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
value = params[:"meta_value#{i}"]
user_meta[key] = value
- end #max.each do
- else #can try to get blob_metadata from http headers
- meta_array = request.env.select{|k,v|
k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
- meta_array.inject({}){ |result, array| user_meta[array.first.upcase] =
array.last}
- end #end if
+ end
+ else #can try to get blob_metadata from http headers (i.e. from PUT)
+ meta_array = request.env.select{|k,v|
k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+ meta_array.inject({}){ |result, array| user_meta[array.first.upcase] =
array.last}
Another potetnital user of extract_blob_meta ;)
done, patch 4/4
+ end
@blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data,
user_meta)
+ temp_file.delete if temp_file
respond_to do |format|
format.html { haml :"blobs/show"}
format.xml { haml :"blobs/show" }
@@ -730,7 +764,7 @@ delete
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
respond_to do |format|
format.xml { 204 }
format.json { 204 }
- format.html { bucket_url(bucket_id) }
+ format.html { redirect(bucket_url(bucket_id)) }
end
end
@@ -768,7 +802,7 @@ get
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
respond_to do |format|
format.html { haml :"blobs/show" }
format.xml { haml :"blobs/show" }
- format.json { convert_to_json(blobs, @blob) }
+ format.json { convert_to_json(:blobs, @blob) }
end
else
report_error(404)
diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
index a075f0a..bf5c6f5 100644
--- a/server/views/blobs/new.html.haml
+++ b/server/views/blobs/new.html.haml
@@ -3,13 +3,7 @@
%form{ :action => bucket_url(@bucket_id), :method => :post, :enctype =>
'multipart/form-data'}
%label
Blob Name:
- %input{ :name => 'blob_id', :size => 512}/
- %label
- Blob Data:
- %br
- %input{ :type => "file", :name => 'blob_data', :size => 50}/
- %br
- %br
+ %input{ :name => 'blob', :size => 512}/
%input{ :type => "hidden", :name => "meta_params", :value => "0"}
%a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata
%div{ :id => "metadata_holder", :style => "display: none;"}
@@ -23,4 +17,10 @@
%a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata
%br
%br
+ %label
+ Blob Data:
+ %br
+ %input{ :type => "file", :name => 'blob_data', :size => 50}/
+ %br
+ %br
%input{ :type => :submit, :name => "commit", :value => "create"}/