Ok, thanks for the review - I am sending out a new patch series. I address your comments inline:

On 08/06/11 11:52, David Lutterkort wrote:
On Tue, 2011-06-07 at 20:45 +0300, [email protected] wrote:
From: marios<[email protected]>


Signed-off-by: marios<[email protected]>

Almost ACK; there's a few stylistic things that should be done to
simplify the code:

---
  server/lib/deltacloud/drivers/ec2/ec2_driver.rb |   51 +++++++-
  server/lib/deltacloud/helpers/blob_stream.rb    |  155 +++++++++++++++++++++--
  server/server.rb                                |   56 +++++++--
  server/views/blobs/new.html.haml                |   14 +-
  4 files changed, 242 insertions(+), 34 deletions(-)

diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb 
b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
index 4edd989..7f1e021 100644
--- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
+++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
@@ -355,9 +355,9 @@ module Deltacloud
                bucket_list = s3_client.buckets
                bucket_list.each do |current|
                  buckets<<  Bucket.new({:name =>  current.name, :id =>  
current.name})
-              end #bucket_list.each
-            end #if
-          end #safely
+              end
+            end
+          end
            filter_on(buckets, :id, opts)
          end

@@ -387,8 +387,12 @@ module Deltacloud
            blobs = []
            safely do
              s3_bucket = s3_client.bucket(opts['bucket'])
-            s3_bucket.keys({}, true).each do |s3_object|
-              blobs<<  convert_object(s3_object)
+            unless(opts[:id]).nil?

Better: 'if opts[:id]'

+              blobs<<  convert_object(s3_bucket.key(opts[:id], true))
+            else
+              s3_bucket.keys({}, true).each do |s3_object|
+                blobs<<  convert_object(s3_object)
+              end
              end
            end
            blobs = filter_on(blobs, :id, opts)

This and the previous hunk really belong into a different patch.

===> OK, patches 2 and 3 in the new series




@@ -396,7 +400,7 @@ module Deltacloud
          end

          #--
-        # Create Blob
+        # Create Blob - NON Streaming way (i.e. was called with POST html 
multipart form data)
          #--
          def create_blob(credentials, bucket_id, blob_id, data = nil, opts = 
{})
            s3_client = new_client(credentials, :s3)
@@ -445,6 +449,40 @@ module Deltacloud
            end
          end

+        #params: {'user','password','bucket','blob','content_type', 
'content_length', 'metadata'}
+        def blob_stream_connection(params)
+          #canonicalise metadata:
+          
#http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
+          metadata = params['metadata']
+          signature_meta_string = ""
+          unless metadata.nil?
+            metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', 
'x-amz-meta-')
+            keys_array = metadata.keys.sort!
+            keys_array.each {|k| signature_meta_string<<  
"#{k}:#{metadata[k]}\n"}
+          end
+          #s3.amazonaws.com
+          provider = 
"https://#{Deltacloud::Drivers::driver_config[:ec2][:entrypoints]['s3']['us-east-1']}"

This should use endpoint_for_service so that provider selection works as
expected.

===> problem is that we really MUST use 'https://s3.amazonaws.com' (i.e. the us-east endpoint) for putting blobs. Amazon redirects your put to the appropriate endpoint depending on your bucket location (e.g. to s3-eu-west-1.amazonaws.com for European buckets). I tried putting directly to the European endpoint but failed. The 'endpoint_for_service' method checks "(Thread.current[:provider] || ENV['API_PROVIDER'] || DEFAULT_REGION) " for the endpoint.



+          uri = URI.parse(provider)
+          http = Net::HTTP.new("#{params['bucket']}.#{uri.host}", uri.port )
+          http.use_ssl = true
+          http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+          timestamp = Time.now.httpdate
+          string_to_sign =
+            
"PUT\n\n#{params['content_type']}\n#{timestamp}\n#{signature_meta_string}/#{params['bucket']}/#{params['blob']}"
+          auth_string = Aws::Utils::sign(params['password'], string_to_sign)
+          request = Net::HTTP::Put.new("/#{params['blob']}")
+          request['Host'] = "#{params['bucket']}.#{uri.host}"
+          request['Date'] = timestamp
+          request['Content-Type'] = params['content_type']
+          request['Content-Length'] = params['content_length']
+          request['Authorization'] = "AWS #{params['user']}:#{auth_string}"
+          request['Expect'] = "100-continue"
+          unless metadata.nil?
+            metadata.each{|k,v| request[k] = v}
+          end
+          return http, request
+        end
+
          def storage_volumes(credentials, opts={})
            ec2 = new_client( credentials )
            volume_list = (opts and opts[:id]) ? opts[:id] : nil
@@ -582,7 +620,6 @@ module Deltacloud
          end

          private
-
          def new_client(credentials, type = :ec2)
            klass = case type
                      when :elb then Aws::Elb
diff --git a/server/lib/deltacloud/helpers/blob_stream.rb 
b/server/lib/deltacloud/helpers/blob_stream.rb
index 00879a9..355a66a 100644
--- a/server/lib/deltacloud/helpers/blob_stream.rb
+++ b/server/lib/deltacloud/helpers/blob_stream.rb
@@ -17,7 +17,8 @@ include Deltacloud
  begin
    require 'eventmachine'
    #--
-  # based on the example from 
http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
+  # based on the example from
+  #   http://macournoyer.com/blog/2009/06/04/pusher-and-async-with-thin/
    #--
    class BlobStream
      AsyncResponse = [-1, {}, []].freeze
@@ -32,11 +33,11 @@ begin
          'Content-Disposition' =>  params["content_disposition"],
          'Content-Length' =>  "#{params['content_length']}"}, body]
        }
-      #call the driver from here. the driver method yields for every chunk of 
blob it receives. We then
-      #use body.call to write that chunk as received.
+      #call the driver from here. the driver method yields for every chunk
+      #of blob it receives. Then use body.call to write that chunk as received.
        driver.blob_data(credentials, params[:bucket], params[:blob], params) {|chunk| 
body.call ["#{chunk}"]} #close blob_data block
        body.succeed
-      AsyncResponse # Tells Thin to not close the connection and continue it's 
work on other request
+      AsyncResponse # Tell Thin to not close connection&  work other requests
      end
    end

The last two hunks only change formatting and should really go into a
separate patch.

==>OK patch 3 in the new series


@@ -69,13 +70,149 @@ class Hash
      remove = []
      self.each_key do |key|
        if key.to_s.match(rgx_pattern)
-         new_key = key.to_s.gsub(rgx_pattern, replacement)
+         new_key = key.to_s.gsub(rgx_pattern, replacement).downcase
           self[new_key] = self[key]
           remove<<  key
-      end #key.match
-    end # each_key do
+      end
+    end
      #remove the original keys
      self.delete_if{|k,v| remove.include?(k)}
-  end #def
+  end
+
+end
+
+#Monkey patch for streaming blobs:
+# Normally a client will upload a blob to deltacloud and thin will put
+# this into a tempfile. Then deltacloud would stream up to the provider:
+#   i.e.  client =-->>TEMP_FILE-->>  deltacloud =-->>STREAM-->>  provider
+# Instead we want to recognise that this is a 'PUT blob' operation and
+# start streaming to the provider as the request is received:
+#   i.e.  client =-->>STREAM-->>  deltacloud =-->>STREAM-->>  provider
+module Thin
+  class Request
+
+    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if 
defined?(Thin::Response)

Why dies this need to be made conditional ? If we are not running under
thin, it doesn't really matter what move_body_to_tempfile does, since
e.g. Webrick will never call it.


===> Basically I uninstalled thin entirely (i.e. the thin gem) in order to test this code under webrick. In that case, the 'alias_method' failed since Thin::Request.move_body_to_tempfile does not exist (./server/bin/../lib/deltacloud/helpers/blob_stream.rb:94:in `alias_method': undefined method `move_body_to_tempfile' for class `Thin::Request' (NameError)).


+    private
+      def move_body_to_tempfile
+        if BlobStreamIO::is_put_blob(self)
+          @body = BlobStreamIO.new(self)
+        else
+          move_body_to_tempfile_orig
+        end
+      end
+
+  end
+end
+
+require 'net/http'
+#monkey patch for Net:HTTP
+module Net
+  class HTTP

-end #class
+    alias :request_orig :request
+
+    @blob_req # needs global scope for close op later

That line isn't needed.


done (sorry bad habits from my java days... )


+    def request(req, body = nil, blob_stream = nil,&block)
+      unless blob_stream
+        return request_orig(req, body,&block)
+      end
+      @blob_req = req
+      do_start #start the connection
+
+      req.set_body_internal body
+      begin_transport req
+      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
+      @socket
+    end
+
+    class Put<  HTTPRequest
+      def write_header_m(sock, ver, path)
+        write_header(sock, ver, path)
+      end
+    end
+
+    def end_request
+      begin
+      res = HTTPResponse.read_new(@socket)
+      end while res.kind_of?(HTTPContinue)
+      res.reading_body(@socket, @blob_req.response_body_permitted?) {
+        yield res if block_given?
+       }
+  end_transport @blob_req, res

Spaces, please


ok done


+      do_finish
+      res
+    end
+  end
+
+end
+
+require 'base64'
+class BlobStreamIO
+
+  attr_accessor :size, :provider, :sock
+
+  def initialize(request)
+    @client_request = request
+    @size = 0
+    bucket, blob = parse_bucket_blob(request.env["PATH_INFO"])
+    user, password = parse_credentials(request.env['HTTP_AUTHORIZATION'])
+    content_type = request.env['CONTENT_TYPE'] || ""
+    #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value)
+    user_meta = {}
+    meta_array = request.env.select{|k,v| 
k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = 
array.last}

The inject here doesn't do anything, since you never change result, and
never use the return value. Either use each or sth like

         user_meta = meta_array.inject({}){ |res, kv| res[kv.first.upcase] = 
kv.last; res }

thanks, done


Also, there's a lot of blob meta handling sprinkled through the code
(here, ec2 driver etc.) Not for this patch, but in general, it would be
cleaner to have some helper methods to deal with that. For example, a
helper method 'extract_blob_meta(headers, opts)' that does the above,
and takes an option to rename keys, so that in the ec2_driver you can
call

         headers = extract_blob_meta(request.env, :rename =>
         "x-amz-meta-")

ok, I created a new BlobHelper module in blob_stream.rb, patch 4 in the new series


+    @content_length = request.env['CONTENT_LENGTH']
+    @http, provider_request = driver.blob_stream_connection({'user'=>user,
+       'password'=>password, 'bucket'=>bucket, 'blob'=>blob, 'metadata'=>  
user_meta,
+       'content_type'=>content_type, 'content_length'=>@content_length })

Minor stylistic nit: it looks a litle funky to use string keys here
instead of symbols.

done


+    @content_length = @content_length.to_i #for comparison of size in '<<  
(data)'
+    @sock = @http.request(provider_request, nil, true)
+  end
+
+  def<<  (data)
+    @sock.write(data)
+    @size += data.length
+    if (@size>= @content_length)
+      result = @http.end_request
+      if result.is_a?(Net::HTTPSuccess)
+        @client_request.env["BLOB_SUCCESS"] = "true"
+      else
+        @client_request.env["BLOB_FAIL"] = result.body
+      end
+    end
+  end
+
+  def rewind
+  end
+
+  #use the Request.env hash (populated by the ThinParser) to determine whether
+  #this is a post blob operation. By definition, only get here with a body of
+  #>  112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
+  def self.is_put_blob(request = nil)
+    path = request.env['PATH_INFO']
+    method = request.env['REQUEST_METHOD']
+    if ( path =~ 
/^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/&&  
method == 'PUT' )
+      return true
+    else
+      return false
+    end
+  end
+
+  private
+
+  def parse_bucket_blob(request_string)
+    array = request_string.split("/")
+    blob = array.pop
+    bucket = array.pop
+    return bucket, blob
+  end
+
+  def parse_credentials(request_string)
+    decoded = Base64.decode64(request_string.split('Basic ').last)
+    key = decoded.split(':').first
+    pass = decoded.split(':').last
+    return key, pass
+  end
+
+end
diff --git a/server/server.rb b/server/server.rb
index e332679..a8f5ee0 100644
--- a/server/server.rb
+++ b/server/server.rb
@@ -696,13 +696,46 @@ get 
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/new_blob" do
    end
  end

-#create a new blob
+#create a new blob using PUT - streams through deltacloud
+put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
+  if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
+    content_type = env["CONTENT_TYPE"]
+    content_type ||=  ""
+    @blob = driver.blob(credentials, {:id =>  params[:blob],
+                                      'bucket' =>  params[:bucket]})
+    respond_to do |format|
+      format.html { haml :"blobs/show" }
+      format.xml { haml :"blobs/show" }
+      format.json { convert_to_json(:blobs, @blob) }
+    end
+  elsif(env["BLOB_FAIL"])
+    report_error(500) #OK?
+  else # small blobs -<  112kb dont hit the streaming monkey patch - redirect 
to POST rule
+       # also, if running under webrick don't hit the streaming patch (Thin 
specific)
+    path = bucket_url(params[:bucket])
+    status, headers, body = call! env.merge({"PATH_INFO" =>  
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/#{params[:bucket]}",
+                                             "REQUEST_METHOD" =>  "POST",
+                                             "DELTACLOUD_BLOB_NAME" =>  
params[:blob]})

Could we avoid calling the POST rule if we factored the things that both
PUT and POST need into a helper method ?

===> unfortunately no. The two differ in the way they get the params for the driver.create_blob operation: how they get the metadata (http form post vs http headers), how they get the blob data (http form post, vs data given in rack.input which we have to put into a tempfile). The commonality between the two methods is just the call to 'driver.create_blob' once the params are determined. However, to avoid the call to POST from PUT (and hence the requirement for sinatra 1.2.4) I simply put the code directly into the PUT method.



+  end
+end
+
+#create a new blob using html interface - NON STREAMING (i.e. browser POST 
http form data)
  post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
    bucket_id = params[:bucket]
-  blob_id = params['blob_id']
-  blob_data = params['blob_data']
+  #check if we were passed here from the PUT method - with a small blob 
(<112kb)
+  if(env['DELTACLOUD_BLOB_NAME'])
+    blob_id = env['DELTACLOUD_BLOB_NAME']
+    temp_file = Tempfile.new("temp_blob_file")
+    temp_file.write(env['rack.input'].read)
+    temp_file.flush
+    content_type = env['CONTENT_TYPE'] || ""
+    blob_data = {:tempfile =>  temp_file, :type =>  content_type}
+  else
+    blob_id = params['blob']
+    blob_data = params['blob_data']
+  end

This seems too complicated to me ... if we have to jump through hoops to
get to the POST handler, and then through more hoops to have the POST
handler detect we are coming from PUT, we really should be splitting
stuff out into helper method(s)

as previous comment



    user_meta = {}
-#first try get blob_metadata from params (i.e., passed by http form post, e.g. 
browser)
+  #first try metadata from params (i.e., passed by http form post, e.g. 
browser)
    max = params[:meta_params]
    if(max)
      (1..max.to_i).each do |i|
@@ -710,12 +743,13 @@ post 
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
        key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
        value = params[:"meta_value#{i}"]
        user_meta[key] = value
-    end #max.each do
-  else #can try to get blob_metadata from http headers
-    meta_array = request.env.select{|k,v| 
k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
-    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = 
array.last}
-  end #end if
+    end
+  else #can try to get blob_metadata from http headers (i.e. from PUT)
+      meta_array = request.env.select{|k,v| 
k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+      meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = 
array.last}

Another potetnital user of extract_blob_meta ;)

done, patch 4/4


+  end
    @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, 
user_meta)
+  temp_file.delete if temp_file
    respond_to do |format|
      format.html { haml :"blobs/show"}
      format.xml { haml :"blobs/show" }
@@ -730,7 +764,7 @@ delete 
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
    respond_to do |format|
      format.xml {  204 }
      format.json {  204 }
-    format.html { bucket_url(bucket_id) }
+    format.html { redirect(bucket_url(bucket_id)) }
    end
  end

@@ -768,7 +802,7 @@ get 
"#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
      respond_to do |format|
        format.html { haml :"blobs/show" }
        format.xml { haml :"blobs/show" }
-      format.json { convert_to_json(blobs, @blob) }
+      format.json { convert_to_json(:blobs, @blob) }
        end
    else
        report_error(404)
diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
index a075f0a..bf5c6f5 100644
--- a/server/views/blobs/new.html.haml
+++ b/server/views/blobs/new.html.haml
@@ -3,13 +3,7 @@
  %form{ :action =>  bucket_url(@bucket_id), :method =>  :post, :enctype =>  
'multipart/form-data'}
    %label
      Blob Name:
-    %input{ :name =>  'blob_id', :size =>  512}/
-  %label
-    Blob Data:
-    %br
-    %input{ :type =>  "file", :name =>  'blob_data', :size =>  50}/
-    %br
-    %br
+    %input{ :name =>  'blob', :size =>  512}/
    %input{ :type =>  "hidden", :name =>  "meta_params", :value =>  "0"}
    %a{ :href =>  "javascript:;", :onclick =>  "more_fields();"} Add Metadata
    %div{ :id =>  "metadata_holder", :style =>  "display: none;"}
@@ -23,4 +17,10 @@
    %a{ :href =>  "javascript:;", :onclick =>  "less_fields();"} Less Metadata
    %br
    %br
+  %label
+    Blob Data:
+    %br
+    %input{ :type =>  "file", :name =>  'blob_data', :size =>  50}/
+    %br
+    %br
    %input{ :type =>  :submit, :name =>  "commit", :value =>  "create"}/




Reply via email to