From: marios <[email protected]> http://mariosandreou.com/deltacloud/cloud_API/2012/10/24/segmenting-huge-blobs-part-2.html
Signed-off-by: marios <[email protected]> --- server/lib/deltacloud/drivers/ec2/ec2_driver.rb | 54 ++++++++++++++++------ .../drivers/openstack/openstack_driver.rb | 23 ++++++++- 2 files changed, 61 insertions(+), 16 deletions(-) diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb index c7bed05..9c10ff8 100644 --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb @@ -507,33 +507,51 @@ module Deltacloud #-- # Create Blob - NON Streaming way (i.e. was called with POST html multipart form data) #-- + #also called for segmented blobs - as final call with blob manifest def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {}) s3_client = new_client(credentials, :s3) #data is a construct with the temporary file created by server @.tempfile #also file[:type] will give us the content-type - res = nil - # File stream needs to be reopened in binary mode for whatever reason - file = File::open(data[:tempfile].path, 'rb') - #insert ec2-specific header for user metadata ... x-amz-meta-KEY = VALUE - BlobHelper::rename_metadata_headers(opts, 'x-amz-meta-') - opts["Content-Type"] = data[:type] - safely do - res = s3_client.interface.put(bucket_id, - blob_id, - file, - opts) + if(opts[:segment_manifest]) + safely do + s3_client.interface.complete_multipart(bucket_id, blob_id, opts[:segmented_blob_id], opts[:segment_manifest]) + end + else + # File stream needs to be reopened in binary mode + file = File::open(data[:tempfile].path, 'rb') + #insert ec2-specific header for user metadata ... x-amz-meta-KEY = VALUE + BlobHelper::rename_metadata_headers(opts, 'x-amz-meta-') + opts["Content-Type"] = data[:type] + safely do + s3_client.interface.put(bucket_id, + blob_id, + file, + opts) + end end #create a new Blob object and return that Blob.new( { :id => blob_id, :bucket => bucket_id, - :content_length => data[:tempfile].length, - :content_type => data[:type], + :content_length => ((data && data[:tempfile]) ? data[:tempfile].length : nil), + :content_type => ((data && data[:type]) ? data[:type] : nil), :last_modified => '', :user_metadata => opts.select{|k,v| k.match(/^x-amz-meta-/i)} } ) end + def init_segmented_blob(credentials, opts={}) + s3_client = new_client(credentials, :s3) + safely do + s3_client.interface.initiate_multipart(opts[:bucket],opts[:id]) + end + + end + + def blob_segment_id(request, response) + response["etag"].gsub("\"", "") + end + #-- # Delete Blob #-- @@ -591,8 +609,16 @@ module Deltacloud timestamp = Time.now.httpdate string_to_sign = "PUT\n\n#{params[:content_type]}\n#{timestamp}\n#{signature_meta_string}/#{params[:bucket]}/#{params[:blob]}" + if BlobHelper.segmented_blob_op_type(params[:context]) == "segment" + partNumber = BlobHelper.segment_order(params[:context]) + uploadId = BlobHelper.segmented_blob_id(params[:context]) + segment_string = "?partNumber=#{partNumber}&uploadId=#{uploadId}" + string_to_sign << segment_string + request = Net::HTTP::Put.new("/#{params[:blob]}#{segment_string}") + else + request = Net::HTTP::Put.new("/#{params[:blob]}") + end auth_string = Aws::Utils::sign(params[:password], string_to_sign) - request = Net::HTTP::Put.new("/#{params[:blob]}") request['Host'] = "#{params[:bucket]}.#{uri.host}" request['Date'] = timestamp request['Content-Type'] = params[:content_type] diff --git a/server/lib/deltacloud/drivers/openstack/openstack_driver.rb b/server/lib/deltacloud/drivers/openstack/openstack_driver.rb index 04bd409..69d523b 100644 --- a/server/lib/deltacloud/drivers/openstack/openstack_driver.rb +++ b/server/lib/deltacloud/drivers/openstack/openstack_driver.rb @@ -263,8 +263,12 @@ module Deltacloud def create_blob(credentials, bucket, blob, data, opts={}) os = new_client(credentials, :buckets) safely do - BlobHelper.rename_metadata_headers(opts, "X-Object-Meta-") - os_blob = os.container(bucket).create_object(blob, {:content_type=> data[:type], :metadata=>opts}, data[:tempfile]) + if(opts[:segment_manifest]) # finalize a segmented blob upload + os_blob = os.container(bucket).create_object(blob, {:manifest=>"#{bucket}/#{opts[:segmented_blob_id]}"}) + else + BlobHelper.rename_metadata_headers(opts, "X-Object-Meta-") + os_blob = os.container(bucket).create_object(blob, {:content_type=> data[:type], :metadata=>opts}, data[:tempfile]) + end convert_blob(os_blob, bucket) end end @@ -292,8 +296,23 @@ module Deltacloud end end + def init_segmented_blob(credentials, opts={}) + opts[:id] + end + + def blob_segment_id(request, response) + #could be in http header OR query string: + segment_order = BlobHelper.segment_order(request) + blob_name = request.env["PATH_INFO"].gsub(/(&\w*=\w*)*$/, "").split("/").pop + "#{blob_name}#{segment_order}" + end + #params: {:user,:password,:bucket,:blob,:content_type,:content_length,:metadata} + #params[:context] holds the request object - for getting to blob segment params def blob_stream_connection(params) + if BlobHelper.segmented_blob_op_type(params[:context]) == "segment" + params[:blob] = "#{params[:blob]}#{BlobHelper.segment_order(params[:context])}" + end tokens = params[:user].split("+") user_name, tenant_name = tokens.first, tokens.last #need a client for the auth_token and endpoints -- 1.7.11.7
