http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88f124f9/proton-c/bindings/ruby/lib/core/message.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/message.rb b/proton-c/bindings/ruby/lib/core/message.rb index 144990b..0b89016 100644 --- a/proton-c/bindings/ruby/lib/core/message.rb +++ b/proton-c/bindings/ruby/lib/core/message.rb @@ -17,605 +17,617 @@ # under the License. #++ -module Qpid # :nodoc: - - module Proton # :nodoc: +module Qpid::Proton + + # A Message represents an addressable quantity of data. + # + # ==== Message Body + # + # The message body can be set using the #body= method. The message will + # then attempt to determine how exactly to encode the content. + # + # ==== Examples + # + # To create a message for sending: + # + # # send a simple text message + # msg = Qpid::Proton::Message.new + # msg.body = "STATE: update" + # + # # send a binary chunk of data + # data = File.binread("/home/qpid/binfile.tar.gz") + # msg = Qpid::Proton::Message.new + # msg.body = Qpid::Proton::BinaryString.new(data) + # + class Message + + # @private + def proton_send(sender, tag = nil) + dlv = sender.delivery(tag || sender.delivery_tag) + encoded = self.encode + sender.stream(encoded) + sender.advance + dlv.settle if sender.snd_settle_mode == Link::SND_SETTLED + return dlv + end - # A Message represents an addressable quantity of data. - # - # ==== Message Body - # - # The message body can be set using the #body= method. The message will - # then attempt to determine how exactly to encode the content. + # Decodes a message from supplied AMQP data and returns the number + # of bytes consumed. # - # ==== Examples - # - # To create a message for sending: + # ==== Options # - # # send a simple text message - # msg = Qpid::Proton::Message.new - # msg.body = "STATE: update" + # * encoded - the encoded data # - # # send a binary chunk of data - # data = File.binread("/home/qpid/binfile.tar.gz") - # msg = Qpid::Proton::Message.new - # msg.body = Qpid::Proton::BinaryString.new(data) - # - class Message + def decode(encoded) + check(Cproton.pn_message_decode(@impl, encoded, encoded.length)) - # Decodes a message from supplied AMQP data and returns the number - # of bytes consumed. - # - # ==== Options - # - # * encoded - the encoded data - # - def decode(encoded) - check(Cproton.pn_message_decode(@impl, encoded, encoded.length)) + post_decode + end - post_decode + def post_decode # :nodoc: + # decode elements from the message + @properties = {} + props = Codec::Data.new(Cproton::pn_message_properties(@impl)) + if props.next + @properties = props.type.get(props) end - - def post_decode # :nodoc: - # decode elements from the message - @properties = {} - props = Qpid::Proton::Data.new(Cproton::pn_message_properties(@impl)) - if props.next - @properties = props.type.get(props) - end - @instructions = nil - insts = Qpid::Proton::Data.new(Cproton::pn_message_instructions(@impl)) - if insts.next - @instructions = insts.type.get(insts) - end - @annotations = nil - annts = Qpid::Proton::Data.new(Cproton::pn_message_annotations(@impl)) - if annts.next - @annotations = annts.type.get(annts) - end - @body = nil - body = Qpid::Proton::Data.new(Cproton::pn_message_body(@impl)) - if body.next - @body = body.type.get(body) - end + @instructions = nil + insts = Codec::Data.new(Cproton::pn_message_instructions(@impl)) + if insts.next + @instructions = insts.type.get(insts) end - - # Encodes the message. - def encode - pre_encode - size = 16 - loop do - error, data = Cproton::pn_message_encode(@impl, size) - if error == Qpid::Proton::Error::OVERFLOW - size *= 2 - else - check(error) - return data - end - end + @annotations = nil + annts = Codec::Data.new(Cproton::pn_message_annotations(@impl)) + if annts.next + @annotations = annts.type.get(annts) + end + @body = nil + body = Codec::Data.new(Cproton::pn_message_body(@impl)) + if body.next + @body = body.type.get(body) end + end - def pre_encode # :nodoc: - # encode elements from the message - props = Qpid::Proton::Data.new(Cproton::pn_message_properties(@impl)) - props.clear - Qpid::Proton::Mapping.for_class(@properties.class).put(props, @properties) unless @properties.empty? - insts = Qpid::Proton::Data.new(Cproton::pn_message_instructions(@impl)) - insts.clear - if !@instructions.nil? - mapping = Qpid::Proton::Mapping.for_class(@instructions.class) - mapping.put(insts, @instructions) - end - annts = Qpid::Proton::Data.new(Cproton::pn_message_annotations(@impl)) - annts.clear - if !@annotations.nil? - mapping = Qpid::Proton::Mapping.for_class(@annotations.class) - mapping.put(annts, @annotations, :keys => :SYMBOL) - end - body = Qpid::Proton::Data.new(Cproton::pn_message_body(@impl)) - body.clear - if !@body.nil? - mapping = Qpid::Proton::Mapping.for_class(@body.class) - mapping.put(body, @body) + # Encodes the message. + def encode + pre_encode + size = 16 + loop do + error, data = Cproton::pn_message_encode(@impl, size) + if error == Qpid::Proton::Error::OVERFLOW + size *= 2 + else + check(error) + return data end end + end - # Creates a new +Message+ instance. - def initialize - @impl = Cproton.pn_message - ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) - @properties = {} - @instructions = {} - @annotations = {} - @body = nil + def pre_encode # :nodoc: + # encode elements from the message + props = Codec::Data.new(Cproton::pn_message_properties(@impl)) + props.clear + Codec::Mapping.for_class(@properties.class).put(props, @properties) unless @properties.empty? + insts = Codec::Data.new(Cproton::pn_message_instructions(@impl)) + insts.clear + if !@instructions.nil? + mapping = Codec::Mapping.for_class(@instructions.class) + mapping.put(insts, @instructions) + end + annts = Codec::Data.new(Cproton::pn_message_annotations(@impl)) + annts.clear + if !@annotations.nil? + mapping = Codec::Mapping.for_class(@annotations.class) + mapping.put(annts, @annotations, :keys => :SYMBOL) + end + body = Codec::Data.new(Cproton::pn_message_body(@impl)) + body.clear + if !@body.nil? + mapping = Codec::Mapping.for_class(@body.class) + mapping.put(body, @body) end + end - def to_s - tmp = Cproton.pn_string("") - Cproton.pn_inspect(@impl, tmp) - result = Cproton.pn_string_get(tmp) - Cproton.pn_free(tmp) - return result - end + # Creates a new +Message+ instance. + def initialize + @impl = Cproton.pn_message + ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) + @properties = {} + @instructions = {} + @annotations = {} + @body = nil + end - # Invoked by garbage collection to clean up resources used - # by the underlying message implementation. - def self.finalize!(impl) # :nodoc: - proc { - Cproton.pn_message_free(impl) - } - end + def to_s + tmp = Cproton.pn_string("") + Cproton.pn_inspect(@impl, tmp) + result = Cproton.pn_string_get(tmp) + Cproton.pn_free(tmp) + return result + end - # Returns the underlying message implementation. - def impl # :nodoc: - @impl - end + # Invoked by garbage collection to clean up resources used + # by the underlying message implementation. + def self.finalize!(impl) # :nodoc: + proc { + Cproton.pn_message_free(impl) + } + end - # Clears the state of the +Message+. This allows a single instance of - # +Message+ to be reused. - # - def clear - Cproton.pn_message_clear(@impl) - @properties.clear unless @properties.nil? - @instructions.clear unless @instructions.nil? - @annotations.clear unless @annotations.nil? - @body = nil - end + # Returns the underlying message implementation. + def impl # :nodoc: + @impl + end - # Returns the most recent error number. - # - def errno - Cproton.pn_message_errno(@impl) - end + # Clears the state of the +Message+. This allows a single instance of + # +Message+ to be reused. + # + def clear + Cproton.pn_message_clear(@impl) + @properties.clear unless @properties.nil? + @instructions.clear unless @instructions.nil? + @annotations.clear unless @annotations.nil? + @body = nil + end - # Returns the most recent error message. - # - def error - Cproton.pn_error_text(Cproton.pn_message_error(@impl)) - end + # Returns the most recent error number. + # + def errno + Cproton.pn_message_errno(@impl) + end - # Returns whether there is currently an error reported. - # - def error? - !Cproton.pn_message_errno(@impl).zero? - end + # Returns the most recent error message. + # + def error + Cproton.pn_error_text(Cproton.pn_message_error(@impl)) + end - # Sets the durable flag. - # - # See ::durable for more details on message durability. - # - # ==== Options - # - # * state - the durable state - # - def durable=(state) - raise TypeError.new("state cannot be nil") if state.nil? - Cproton.pn_message_set_durable(@impl, state) - end + # Returns whether there is currently an error reported. + # + def error? + !Cproton.pn_message_errno(@impl).zero? + end - # Returns the durable property. - # - # The durable property indicates that the emessage should be held durably - # by any intermediaries taking responsibility for the message. - # - # ==== Examples - # - # msg = Qpid::Proton::Message.new - # msg.durable = true - # - def durable - Cproton.pn_message_is_durable(@impl) - end + # Sets the durable flag. + # + # See ::durable for more details on message durability. + # + # ==== Options + # + # * state - the durable state + # + def durable=(state) + raise TypeError.new("state cannot be nil") if state.nil? + Cproton.pn_message_set_durable(@impl, state) + end - # Sets the priority. - # - # +NOTE:+ Priority values are limited to the range [0,255]. - # - # ==== Options - # - # * priority - the priority value - # - def priority=(priority) - raise TypeError.new("invalid priority: #{priority}") if priority.nil? || !([Float, Fixnum].include?(priority.class)) - raise RangeError.new("priority out of range: #{priority}") if ((priority > 255) || (priority < 0)) - Cproton.pn_message_set_priority(@impl, priority.floor) - end + # Returns the durable property. + # + # The durable property indicates that the emessage should be held durably + # by any intermediaries taking responsibility for the message. + # + # ==== Examples + # + # msg = Qpid::Proton::Message.new + # msg.durable = true + # + def durable + Cproton.pn_message_is_durable(@impl) + end - # Returns the priority. - # - def priority - Cproton.pn_message_get_priority(@impl) - end + # Sets the priority. + # + # +NOTE:+ Priority values are limited to the range [0,255]. + # + # ==== Options + # + # * priority - the priority value + # + def priority=(priority) + raise TypeError.new("invalid priority: #{priority}") if priority.nil? || !([Float, Fixnum].include?(priority.class)) + raise RangeError.new("priority out of range: #{priority}") if ((priority > 255) || (priority < 0)) + Cproton.pn_message_set_priority(@impl, priority.floor) + end - # Sets the time-to-live for the message. - # - # ==== Options - # - # * time - the time in milliseconds - # - def ttl=(time) - raise TypeError.new("invalid ttl: #{time}") if time.nil? || !([Float, Fixnum].include?(time.class)) - raise RangeError.new("time out of range: #{time}") if ((time < 0)) - Cproton.pn_message_set_ttl(@impl, time.floor) - end + # Returns the priority. + # + def priority + Cproton.pn_message_get_priority(@impl) + end - # Returns the time-to-live, in milliseconds. - # - def ttl - Cproton.pn_message_get_ttl(@impl) - end + # Sets the time-to-live for the message. + # + # ==== Options + # + # * time - the time in milliseconds + # + def ttl=(time) + raise TypeError.new("invalid ttl: #{time}") if time.nil? || !([Float, Fixnum].include?(time.class)) + raise RangeError.new("time out of range: #{time}") if ((time < 0)) + Cproton.pn_message_set_ttl(@impl, time.floor) + end - # Sets whether this is the first time the message was acquired. - # - # See ::first_acquirer? for more details. - # - # ==== Options - # - # * state - true if claiming the message - # - def first_acquirer=(state) - raise TypeError.new("invalid state: #{state}") if state.nil? || !([TrueClass, FalseClass].include?(state.class)) - Cproton.pn_message_set_first_acquirer(@impl, state) - end + # Returns the time-to-live, in milliseconds. + # + def ttl + Cproton.pn_message_get_ttl(@impl) + end - # Sets the delivery count for the message. - # - # See ::delivery_count for more details. - # - # ==== Options - # - # * count - the delivery count - # - def delivery_count=(count) - raise ArgumentError.new("invalid count: #{count}") if count.nil? || !([Float, Fixnum].include?(count.class)) - raise RangeError.new("count out of range: #{count}") if count < 0 - - Cproton.pn_message_set_delivery_count(@impl, count.floor) - end + # Sets whether this is the first time the message was acquired. + # + # See ::first_acquirer? for more details. + # + # ==== Options + # + # * state - true if claiming the message + # + def first_acquirer=(state) + raise TypeError.new("invalid state: #{state}") if state.nil? || !([TrueClass, FalseClass].include?(state.class)) + Cproton.pn_message_set_first_acquirer(@impl, state) + end - # Returns the delivery count for the message. - # - # This is the number of delivery attempts for the given message. - # - def delivery_count - Cproton.pn_message_get_delivery_count(@impl) - end + # Sets the delivery count for the message. + # + # See ::delivery_count for more details. + # + # ==== Options + # + # * count - the delivery count + # + def delivery_count=(count) + raise ArgumentError.new("invalid count: #{count}") if count.nil? || !([Float, Fixnum].include?(count.class)) + raise RangeError.new("count out of range: #{count}") if count < 0 - # Returns whether this is the first acquirer. - # - # - def first_acquirer? - Cproton.pn_message_is_first_acquirer(@impl) - end + Cproton.pn_message_set_delivery_count(@impl, count.floor) + end - # Sets the message id. - # - # ==== Options - # - # * id = the id - # - def id=(id) - Cproton.pn_message_set_id(@impl, id) - end + # Returns the delivery count for the message. + # + # This is the number of delivery attempts for the given message. + # + def delivery_count + Cproton.pn_message_get_delivery_count(@impl) + end - # Returns the message id. - # - def id - Cproton.pn_message_get_id(@impl) - end + # Returns whether this is the first acquirer. + # + # + def first_acquirer? + Cproton.pn_message_is_first_acquirer(@impl) + end - # Sets the user id. - # - # ==== Options - # - # * id - the user id - # - def user_id=(id) - Cproton.pn_message_set_user_id(@impl, id) - end + # Sets the message id. + # + # ==== Options + # + # * id = the id + # + def id=(id) + Cproton.pn_message_set_id(@impl, id) + end - # Returns the user id. - # - def user_id - Cproton.pn_message_get_user_id(@impl) - end + # Returns the message id. + # + def id + Cproton.pn_message_get_id(@impl) + end - # Sets the destination address. - # - # ==== Options - # - # * address - the address - # - def address=(address) - Cproton.pn_message_set_address(@impl, address) - end + # Sets the user id. + # + # ==== Options + # + # * id - the user id + # + def user_id=(id) + Cproton.pn_message_set_user_id(@impl, id) + end - # Returns the destination address. - # - def address - Cproton.pn_message_get_address(@impl) - end + # Returns the user id. + # + def user_id + Cproton.pn_message_get_user_id(@impl) + end - # Sets the subject. - # - # ==== Options - # - # * subject - the subject - # - def subject=(subject) - Cproton.pn_message_set_subject(@impl, subject) - end + # Sets the destination address. + # + # ==== Options + # + # * address - the address + # + def address=(address) + Cproton.pn_message_set_address(@impl, address) + end - # Returns the subject - # - def subject - Cproton.pn_message_get_subject(@impl) - end + # Returns the destination address. + # + def address + Cproton.pn_message_get_address(@impl) + end - # Sets the reply-to address. - # - # ==== Options - # - # * address - the reply-to address - # - def reply_to=(address) - Cproton.pn_message_set_reply_to(@impl, address) - end + # Sets the subject. + # + # ==== Options + # + # * subject - the subject + # + def subject=(subject) + Cproton.pn_message_set_subject(@impl, subject) + end - # Returns the reply-to address - # - def reply_to - Cproton.pn_message_get_reply_to(@impl) - end + # Returns the subject + # + def subject + Cproton.pn_message_get_subject(@impl) + end - # Sets the correlation id. - # - # ==== Options - # - # * id - the correlation id - # - def correlation_id=(id) - Cproton.pn_message_set_correlation_id(@impl, id) - end + # Sets the reply-to address. + # + # ==== Options + # + # * address - the reply-to address + # + def reply_to=(address) + Cproton.pn_message_set_reply_to(@impl, address) + end - # Returns the correlation id. - # - def correlation_id - Cproton.pn_message_get_correlation_id(@impl) - end + # Returns the reply-to address + # + def reply_to + Cproton.pn_message_get_reply_to(@impl) + end - # Sets the message format. - # - # See MessageFormat for more details on formats. - # - # *Warning:* This method has been deprecated. - # - # ==== Options - # - # * format - the format - # - def format=(format) - raise TypeError.new("invalid message format: #{format}") if (format.nil? || !format.kind_of?(Qpid::Proton::MessageFormat)) - Cproton.pn_message_set_format(@impl, format.value) - end + # Sets the correlation id. + # + # ==== Options + # + # * id - the correlation id + # + def correlation_id=(id) + Cproton.pn_message_set_correlation_id(@impl, id) + end - # Returns the message format - # - # *Warning:* This method has been deprecated. - # - # ==== Note - # - # This method is now deprecated. - # - def format - Qpid::Proton::MessageFormat.by_value(Cproton.pn_message_get_format(@impl)) - end + # Returns the correlation id. + # + def correlation_id + Cproton.pn_message_get_correlation_id(@impl) + end - # Sets the content type. - # - # ==== Options - # - # * content_type - the content type - # - def content_type=(content_type) - Cproton.pn_message_set_content_type(@impl, content_type) - end + # Sets the content type. + # + # ==== Options + # + # * content_type - the content type + # + def content_type=(content_type) + Cproton.pn_message_set_content_type(@impl, content_type) + end - # Returns the content type - # - def content_type - Cproton.pn_message_get_content_type(@impl) - end + # Returns the content type + # + def content_type + Cproton.pn_message_get_content_type(@impl) + end - # Sets the content encoding type. - # - # ==== Options - # - # * encoding - the content encoding - # - def content_encoding=(encoding) - Cproton.pn_message_set_content_encoding(@impl, encoding) - end + # Sets the message content. + # + # *WARNING:* This method has been deprecated. Please use #body= instead to + # set the content of a message. + # + # ==== Options + # + # * content - the content + # + def content=(content) + Cproton.pn_message_load(@impl, content) + end - # Returns the content encoding type. - # - def content_encoding - Cproton.pn_message_get_content_encoding(@impl) + # Returns the message content. + # + # *WARNING:* This method has been deprecated. Please use #body instead to + # retrieve the content of a message. + # + def content + size = 16 + loop do + result = Cproton.pn_message_save(@impl, size) + error = result[0] + data = result[1] + if error == Qpid::Proton::Error::OVERFLOW + size = size * 2 + else + check(error) + return data + end end + end - # Sets the expiration time. - # - # ==== Options - # - # * time - the expiry time - # - def expires=(time) - raise TypeError.new("invalid expiry time: #{time}") if time.nil? - raise ArgumentError.new("expiry time cannot be negative: #{time}") if time < 0 - Cproton.pn_message_set_expiry_time(@impl, time) - end + # Sets the content encoding type. + # + # ==== Options + # + # * encoding - the content encoding + # + def content_encoding=(encoding) + Cproton.pn_message_set_content_encoding(@impl, encoding) + end - # Returns the expiration time. - # - def expires - Cproton.pn_message_get_expiry_time(@impl) - end + # Returns the content encoding type. + # + def content_encoding + Cproton.pn_message_get_content_encoding(@impl) + end - # Sets the creation time. - # - # ==== Options - # - # * time - the creation time - # - def creation_time=(time) - raise TypeError.new("invalid time: #{time}") if time.nil? - raise ArgumentError.new("time cannot be negative") if time < 0 - Cproton.pn_message_set_creation_time(@impl, time) - end + # Sets the expiration time. + # + # ==== Options + # + # * time - the expiry time + # + def expires=(time) + raise TypeError.new("invalid expiry time: #{time}") if time.nil? + raise ArgumentError.new("expiry time cannot be negative: #{time}") if time < 0 + Cproton.pn_message_set_expiry_time(@impl, time) + end - # Returns the creation time. - # - def creation_time - Cproton.pn_message_get_creation_time(@impl) - end + # Returns the expiration time. + # + def expires + Cproton.pn_message_get_expiry_time(@impl) + end - # Sets the group id. - # - # ==== Options - # - # * id - the group id - # - def group_id=(id) - Cproton.pn_message_set_group_id(@impl, id) - end + # Sets the creation time. + # + # ==== Options + # + # * time - the creation time + # + def creation_time=(time) + raise TypeError.new("invalid time: #{time}") if time.nil? + raise ArgumentError.new("time cannot be negative") if time < 0 + Cproton.pn_message_set_creation_time(@impl, time) + end - # Returns the group id. - # - def group_id - Cproton.pn_message_get_group_id(@impl) - end + # Returns the creation time. + # + def creation_time + Cproton.pn_message_get_creation_time(@impl) + end - # Sets the group sequence number. - # - # ==== Options - # - # * seq - the sequence number - # - def group_sequence=(seq) - raise TypeError.new("invalid seq: #{seq}") if seq.nil? - Cproton.pn_message_set_group_sequence(@impl, seq) - end + # Sets the group id. + # + # ==== Options + # + # * id - the group id + # + def group_id=(id) + Cproton.pn_message_set_group_id(@impl, id) + end - # Returns the group sequence number. - # - def group_sequence - Cproton.pn_message_get_group_sequence(@impl) - end + # Returns the group id. + # + def group_id + Cproton.pn_message_get_group_id(@impl) + end - # Sets the reply-to group id. - # - # ==== Options - # - # * id - the id - # - def reply_to_group_id=(id) - Cproton.pn_message_set_reply_to_group_id(@impl, id) - end + # Sets the group sequence number. + # + # ==== Options + # + # * seq - the sequence number + # + def group_sequence=(seq) + raise TypeError.new("invalid seq: #{seq}") if seq.nil? + Cproton.pn_message_set_group_sequence(@impl, seq) + end - # Returns the reply-to group id. - # - def reply_to_group_id - Cproton.pn_message_get_reply_to_group_id(@impl) - end + # Returns the group sequence number. + # + def group_sequence + Cproton.pn_message_get_group_sequence(@impl) + end - # Returns the list of property names for associated with this message. - # - # ==== Examples - # - # msg.properties.each do |name| - # end - # - def properties - @properties - end + # Sets the reply-to group id. + # + # ==== Options + # + # * id - the id + # + def reply_to_group_id=(id) + Cproton.pn_message_set_reply_to_group_id(@impl, id) + end - # Replaces the entire set of properties with the specified hash. - # - def properties=(properties) - @properties = properties - end + # Returns the reply-to group id. + # + def reply_to_group_id + Cproton.pn_message_get_reply_to_group_id(@impl) + end - # Assigns the value given to the named property. - # - # ==== Arguments - # - # * name - the property name - # * value - the property value - # - def []=(name, value) - @properties[name] = value - end + # Returns the list of property names for associated with this message. + # + # ==== Examples + # + # msg.properties.each do |name| + # end + # + def properties + @properties + end - # Retrieves the value for the specified property name. If not found, then - # it returns nil. - # - def [](name) - @properties[name] - end + # Replaces the entire set of properties with the specified hash. + # + def properties=(properties) + @properties = properties + end - # Deletes the named property. - # - def delete_property(name) - @properties.delete(name) - end + # Assigns the value given to the named property. + # + # ==== Arguments + # + # * name - the property name + # * value - the property value + # + def []=(name, value) + @properties[name] = value + end - # Returns the instructions for this message. - # - def instructions - @instructions - end + # Retrieves the value for the specified property name. If not found, then + # it returns nil. + # + def [](name) + @properties[name] + end - # Assigns instructions to this message. - # - def instructions=(instr) - @instructions = instr - end + # Deletes the named property. + # + def delete_property(name) + @properties.delete(name) + end - # Returns the annotations for this message. - # - def annotations - @annotations - end + # Returns the instructions for this message. + # + def instructions + @instructions + end - # Assigns annotations to this message. - # - def annotations=(annotations) - @annotations = annotations - end + # Assigns instructions to this message. + # + def instructions=(instr) + @instructions = instr + end - # Returns the body property of the message. - # - def body - @body - end + # Returns the annotations for this message. + # + def annotations + @annotations + end - # Assigns a new value to the body of the message. - # - def body=(body) - @body = body - end + # Assigns annotations to this message. + # + def annotations=(annotations) + @annotations = annotations + end - private + # Returns the body property of the message. + # + def body + @body + end - def check(err) # :nodoc: - if err < 0 - raise DataError, "[#{err}]: #{Cproton.pn_message_error(@impl)}" - else - return err - end - end + # Assigns a new value to the body of the message. + # + def body=(body) + @body = body end + private + + def check(err) # :nodoc: + if err < 0 + raise DataError, "[#{err}]: #{Cproton.pn_message_error(@data)}" + else + return err + end + end end end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88f124f9/proton-c/bindings/ruby/lib/messenger/filters.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/messenger/filters.rb b/proton-c/bindings/ruby/lib/messenger/filters.rb index 370d017..e2b50bc 100644 --- a/proton-c/bindings/ruby/lib/messenger/filters.rb +++ b/proton-c/bindings/ruby/lib/messenger/filters.rb @@ -17,47 +17,44 @@ # under the License. #++ -module Qpid # :nodoc: +module Qpid::Proton - module Proton # :nodoc: + # @private + module Filters - module Filters - - def self.included(base) - base.class_eval do - extend ClassMethods - end + def self.included(base) + base.class_eval do + extend ClassMethods end + end - module ClassMethods + module ClassMethods - def method_added(method_name) - @@hooked_methods ||= [] - return if @@hooked_methods.include?(method_name) - @@hooked_methods << method_name + def method_added(method_name) + @@hooked_methods ||= [] + return if @@hooked_methods.include?(method_name) + @@hooked_methods << method_name + hooks = @@before_hooks[method_name] + return if hooks.nil? + orig_method = instance_method(method_name) + define_method(method_name) do |*args, &block| hooks = @@before_hooks[method_name] - return if hooks.nil? - orig_method = instance_method(method_name) - define_method(method_name) do |*args, &block| - hooks = @@before_hooks[method_name] - hooks.each do |hook| - method(hook).call - end - - orig_method.bind(self).call(*args, &block) + hooks.each do |hook| + method(hook).call end - end - def call_before(before_method, *methods) - @@before_hooks ||= {} - methods.each do |method| - hooks = @@before_hooks[method] || [] - raise "Repeat filter: #{before_method}" if hooks.include? before_method - hooks << before_method - @@before_hooks[method] = hooks - end + orig_method.bind(self).call(*args, &block) end + end + def call_before(before_method, *methods) + @@before_hooks ||= {} + methods.each do |method| + hooks = @@before_hooks[method] || [] + raise "Repeat filter: #{before_method}" if hooks.include? before_method + hooks << before_method + @@before_hooks[method] = hooks + end end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88f124f9/proton-c/bindings/ruby/lib/messenger/messenger.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/messenger/messenger.rb b/proton-c/bindings/ruby/lib/messenger/messenger.rb index 5a16c50..f96a535 100644 --- a/proton-c/bindings/ruby/lib/messenger/messenger.rb +++ b/proton-c/bindings/ruby/lib/messenger/messenger.rb @@ -17,684 +17,684 @@ # under the License. # -module Qpid # :nodoc: - - module Proton # :nodoc: - - # The +Messenger+ class defines a high level interface for - # sending and receiving Messages. Every Messenger contains - # a single logical queue of incoming messages and a single - # logical queue of outgoing messages. These messages in these - # queues may be destined for, or originate from, a variety of - # addresses. - # - # The messenger interface is single-threaded. All methods - # except one ( #interrupt ) are intended to be used from within - # the messenger thread. - # - # === Sending & Receiving Messages - # - # The Messenger class works in conjuction with the Message class. The - # Message class is a mutable holder of message content. - # - # The put method copies its Message to the outgoing queue, and may - # send queued messages if it can do so without blocking. The send - # method blocks until it has sent the requested number of messages, - # or until a timeout interrupts the attempt. - # - # Similarly, the recv method receives messages into the incoming - # queue, and may block as it attempts to receive the requested number - # of messages, or until timeout is reached. It may receive fewer - # than the requested number. The get method pops the - # eldest Message off the incoming queue and copies it into the Message - # object that you supply. It will not block. - # - # The blocking attribute allows you to turn off blocking behavior entirely, - # in which case send and recv will do whatever they can without - # blocking, and then return. You can then look at the number - # of incoming and outgoing messages to see how much outstanding work - # still remains. - # - class Messenger - - include Qpid::Proton::ExceptionHandling - - can_raise_exception [:send, :receive, :password=, :start, :stop, - :perform_put, :perform_get, :interrupt, - :route, :rewrite, :accept, :reject, - :incoming_window=, :outgoing_window=] - - # Creates a new +Messenger+. - # - # The +name+ parameter is optional. If one is not provided then - # a unique name is generated. - # - # ==== Options - # - # * name - the name (def. nil) - # - def initialize(name = nil) - @impl = Cproton.pn_messenger(name) - @selectables = {} - ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) - end +module Qpid::Proton::Messenger + + # The +Messenger+ class defines a high level interface for + # sending and receiving Messages. Every Messenger contains + # a single logical queue of incoming messages and a single + # logical queue of outgoing messages. These messages in these + # queues may be destined for, or originate from, a variety of + # addresses. + # + # The messenger interface is single-threaded. All methods + # except one ( #interrupt ) are intended to be used from within + # the messenger thread. + # + # === Sending & Receiving Messages + # + # The Messenger class works in conjuction with the Message class. The + # Message class is a mutable holder of message content. + # + # The put method copies its Message to the outgoing queue, and may + # send queued messages if it can do so without blocking. The send + # method blocks until it has sent the requested number of messages, + # or until a timeout interrupts the attempt. + # + # Similarly, the recv method receives messages into the incoming + # queue, and may block as it attempts to receive the requested number + # of messages, or until timeout is reached. It may receive fewer + # than the requested number. The get method pops the + # eldest Message off the incoming queue and copies it into the Message + # object that you supply. It will not block. + # + # The blocking attribute allows you to turn off blocking behavior entirely, + # in which case send and recv will do whatever they can without + # blocking, and then return. You can then look at the number + # of incoming and outgoing messages to see how much outstanding work + # still remains. + # + class Messenger + + include Qpid::Proton::Util::ErrorHandler + + can_raise_error [:send, :receive, :password=, :start, :stop, + :perform_put, :perform_get, :interrupt, + :route, :rewrite, :accept, :reject, + :incoming_window=, :outgoing_window=] + + # Creates a new +Messenger+. + # + # The +name+ parameter is optional. If one is not provided then + # a unique name is generated. + # + # ==== Options + # + # * name - the name (def. nil) + # + def initialize(name = nil) + @impl = Cproton.pn_messenger(name) + @selectables = {} + ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) + end - def self.finalize!(impl) # :nodoc: - proc { - Cproton.pn_messenger_free(impl) - } - end + def self.finalize!(impl) # :nodoc: + proc { + Cproton.pn_messenger_free(impl) + } + end - # Returns the name. - # - def name - Cproton.pn_messenger_name(@impl) - end + # Returns the name. + # + def name + Cproton.pn_messenger_name(@impl) + end - # This property contains the password for the Messenger.private_key - # file, or +nil+ if the file is not encrypted. - # - # ==== Arguments - # - # * password - the password - # - def password=(password) - Cproton.pn_messenger_set_password(@impl, password) - end + # This property contains the password for the Messenger.private_key + # file, or +nil+ if the file is not encrypted. + # + # ==== Arguments + # + # * password - the password + # + def password=(password) + Cproton.pn_messenger_set_password(@impl, password) + end - # Returns the password property for the Messenger.private_key file. - # - def password - Cproton.pn_messenger_get_password(@impl) - end + # Returns the password property for the Messenger.private_key file. + # + def password + Cproton.pn_messenger_get_password(@impl) + end - # Sets the timeout period, in milliseconds. - # - # A negative timeout period implies an infinite timeout. - # - # ==== Options - # - # * timeout - the timeout period - # - def timeout=(timeout) - raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil? - Cproton.pn_messenger_set_timeout(@impl, timeout) - end + # Sets the timeout period, in milliseconds. + # + # A negative timeout period implies an infinite timeout. + # + # ==== Options + # + # * timeout - the timeout period + # + def timeout=(timeout) + raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil? + Cproton.pn_messenger_set_timeout(@impl, timeout) + end - # Returns the timeout period - # - def timeout - Cproton.pn_messenger_get_timeout(@impl) - end + # Returns the timeout period + # + def timeout + Cproton.pn_messenger_get_timeout(@impl) + end - # Returns true if blocking mode is enabled. - # - # Enable or disable blocking behavior during message sending - # and receiving. This affects every blocking call, with the - # exception of work(). Currently, the affected calls are - # send, recv, and stop. - def blocking? - Cproton.pn_messenger_is_blocking(@impl) - end + # Returns true if blocking mode is enabled. + # + # Enable or disable blocking behavior during message sending + # and receiving. This affects every blocking call, with the + # exception of work(). Currently, the affected calls are + # send, recv, and stop. + def blocking? + Cproton.pn_messenger_is_blocking(@impl) + end - # Sets the blocking mode. - def blocking=(blocking) - Cproton.pn_messenger_set_blocking(@impl, blocking) - end + # Sets the blocking mode. + def blocking=(blocking) + Cproton.pn_messenger_set_blocking(@impl, blocking) + end - # Returns true if passive mode is enabled. - # - def passive? - Cproton.pn_messenger_is_passive(@impl) - end + # Returns true if passive mode is enabled. + # + def passive? + Cproton.pn_messenger_is_passive(@impl) + end - # Turns passive mode on or off. - # - # When set to passive mode, Messenger will not attempt to perform I/O - # operations internally. In this mode it is necesssary to use the - # Selectable type to drive any I/O needed to perform requestioned - # actions. - # - # In this mode Messenger will never block. - # - def passive=(mode) - Cproton.pn_messenger_set_passive(@impl, mode) - end + # Turns passive mode on or off. + # + # When set to passive mode, Messenger will not attempt to perform I/O + # operations internally. In this mode it is necesssary to use the + # Selectable type to drive any I/O needed to perform requestioned + # actions. + # + # In this mode Messenger will never block. + # + def passive=(mode) + Cproton.pn_messenger_set_passive(@impl, mode) + end - def deadline - tstamp = Cproton.pn_messenger_deadline(@impl) - return tstamp / 1000.0 unless tstamp.nil? - end + def deadline + tstamp = Cproton.pn_messenger_deadline(@impl) + return tstamp / 1000.0 unless tstamp.nil? + end - # Reports whether an error occurred. - # - def error? - !Cproton.pn_messenger_errno(@impl).zero? - end + # Reports whether an error occurred. + # + def error? + !Cproton.pn_messenger_errno(@impl).zero? + end - # Returns the most recent error number. - # - def errno - Cproton.pn_messenger_errno(@impl) - end + # Returns the most recent error number. + # + def errno + Cproton.pn_messenger_errno(@impl) + end - # Returns the most recent error message. - # - def error - Cproton.pn_error_text(Cproton.pn_messenger_error(@impl)) - end + # Returns the most recent error message. + # + def error + Cproton.pn_error_text(Cproton.pn_messenger_error(@impl)) + end - # Clears the current error state. - # - def clear_error - error = Cproton.pn_messenger_error(@impl) - unless error.nil? - Cproton.pn_error_clear(error) - end + # Clears the current error state. + # + def clear_error + error = Cproton.pn_messenger_error(@impl) + unless error.nil? + Cproton.pn_error_clear(error) end + end - # Currently a no-op placeholder. - # For future compatibility, do not send or recv messages - # before starting the +Messenger+. - # - def start - Cproton.pn_messenger_start(@impl) - end + # Currently a no-op placeholder. + # For future compatibility, do not send or recv messages + # before starting the +Messenger+. + # + def start + Cproton.pn_messenger_start(@impl) + end - # Stops the +Messenger+, preventing it from sending or receiving - # any more messages. - # - def stop - Cproton.pn_messenger_stop(@impl) - end + # Stops the +Messenger+, preventing it from sending or receiving + # any more messages. + # + def stop + Cproton.pn_messenger_stop(@impl) + end - # Returns true if a Messenger is in the stopped state. - # This function does not block. - # - def stopped? - Cproton.pn_messenger_stopped(@impl) - end + # Returns true if a Messenger is in the stopped state. + # This function does not block. + # + def stopped? + Cproton.pn_messenger_stopped(@impl) + end - # Subscribes the Messenger to messages originating from the - # specified source. The source is an address as specified in the - # Messenger introduction with the following addition. If the - # domain portion of the address begins with the '~' character, the - # Messenger will interpret the domain as host/port, bind to it, - # and listen for incoming messages. For example "~0.0.0.0", - # "amqp://~0.0.0.0" will all bind to any local interface and - # listen for incoming messages. An address of "amqps://~0.0.0.0" - # will only permit incoming SSL connections. - # - # ==== Options - # - # * address - the source address to be subscribe - # * timeout - an optional time-to-live value, in seconds, for the - # subscription - # - def subscribe(address, timeout=0) - raise TypeError.new("invalid address: #{address}") if address.nil? - subscription = Cproton.pn_messenger_subscribe_ttl(@impl, address, timeout) - raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil? - Qpid::Proton::Subscription.new(subscription) - end + # Subscribes the Messenger to messages originating from the + # specified source. The source is an address as specified in the + # Messenger introduction with the following addition. If the + # domain portion of the address begins with the '~' character, the + # Messenger will interpret the domain as host/port, bind to it, + # and listen for incoming messages. For example "~0.0.0.0", + # "amqp://~0.0.0.0" will all bind to any local interface and + # listen for incoming messages. An address of "amqps://~0.0.0.0" + # will only permit incoming SSL connections. + # + # ==== Options + # + # * address - the source address to be subscribe + # * timeout - an optional time-to-live value, in seconds, for the + # subscription + # + def subscribe(address, timeout=0) + raise TypeError.new("invalid address: #{address}") if address.nil? + subscription = Cproton.pn_messenger_subscribe_ttl(@impl, address, timeout) + raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil? + Subscription.new(subscription) + end - # Path to a certificate file for the +Messenger+. - # - # This certificate is used when the +Messenger+ accepts or establishes - # SSL/TLS connections. This property must be specified for the - # Messenger to accept incoming SSL/TLS connections and to establish - # client authenticated outgoing SSL/TLS connection. Non client authenticated - # outgoing SSL/TLS connections do not require this property. - # - # ==== Options - # - # * certificate - the certificate - # - def certificate=(certificate) - Cproton.pn_messenger_set_certificate(@impl, certificate) - end + # Path to a certificate file for the +Messenger+. + # + # This certificate is used when the +Messenger+ accepts or establishes + # SSL/TLS connections. This property must be specified for the + # Messenger to accept incoming SSL/TLS connections and to establish + # client authenticated outgoing SSL/TLS connection. Non client authenticated + # outgoing SSL/TLS connections do not require this property. + # + # ==== Options + # + # * certificate - the certificate + # + def certificate=(certificate) + Cproton.pn_messenger_set_certificate(@impl, certificate) + end - # Returns the path to a certificate file. - # - def certificate - Cproton.pn_messenger_get_certificate(@impl) - end + # Returns the path to a certificate file. + # + def certificate + Cproton.pn_messenger_get_certificate(@impl) + end - # Path to a private key file for the +Messenger+. - # - # The property must be specified for the +Messenger+ to accept incoming - # SSL/TLS connections and to establish client authenticated outgoing - # SSL/TLS connections. Non client authenticated SSL/TLS connections - # do not require this property. - # - # ==== Options - # - # * key - the key file - # - def private_key=(key) - Cproton.pn_messenger_set_private_key(@impl, key) - end + # Path to a private key file for the +Messenger+. + # + # The property must be specified for the +Messenger+ to accept incoming + # SSL/TLS connections and to establish client authenticated outgoing + # SSL/TLS connections. Non client authenticated SSL/TLS connections + # do not require this property. + # + # ==== Options + # + # * key - the key file + # + def private_key=(key) + Cproton.pn_messenger_set_private_key(@impl, key) + end - # Returns the path to a private key file. - # - def private_key - Cproton.pn_messenger_get_private_key(@impl) - end + # Returns the path to a private key file. + # + def private_key + Cproton.pn_messenger_get_private_key(@impl) + end - # A path to a database of trusted certificates for use in verifying the - # peer on an SSL/TLS connection. If this property is +nil+, then the - # peer will not be verified. - # - # ==== Options - # - # * certificates - the certificates path - # - def trusted_certificates=(certificates) - Cproton.pn_messenger_set_trusted_certificates(@impl,certificates) - end + # A path to a database of trusted certificates for use in verifying the + # peer on an SSL/TLS connection. If this property is +nil+, then the + # peer will not be verified. + # + # ==== Options + # + # * certificates - the certificates path + # + def trusted_certificates=(certificates) + Cproton.pn_messenger_set_trusted_certificates(@impl,certificates) + end - # The path to the databse of trusted certificates. - # - def trusted_certificates - Cproton.pn_messenger_get_trusted_certificates(@impl) - end + # The path to the databse of trusted certificates. + # + def trusted_certificates + Cproton.pn_messenger_get_trusted_certificates(@impl) + end - # Places the content contained in the message onto the outgoing - # queue of the Messenger. - # - # This method will never block, however it will send any unblocked - # Messages in the outgoing queue immediately and leave any blocked - # Messages remaining in the outgoing queue. - # The send call may then be used to block until the outgoing queue - # is empty. The outgoing attribute may be used to check the depth - # of the outgoing queue. - # - # ==== Options - # - # * message - the message - # - def put(message) - raise TypeError.new("invalid message: #{message}") if message.nil? - raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message) - # encode the message first - message.pre_encode - perform_put(message) - return outgoing_tracker + # Places the content contained in the message onto the outgoing + # queue of the Messenger. + # + # This method will never block, however it will send any unblocked + # Messages in the outgoing queue immediately and leave any blocked + # Messages remaining in the outgoing queue. + # The send call may then be used to block until the outgoing queue + # is empty. The outgoing attribute may be used to check the depth + # of the outgoing queue. + # + # ==== Options + # + # * message - the message + # + def put(message) + if message.nil? + raise Qpid::Proton::TypeError.new("invalid message: #{message}") + end + unless message.kind_of?(Qpid::Proton::Message) + raise Qpid::Proton::ArgumentError.new("invalid message type: #{message.class}") end + # encode the message first + message.pre_encode + perform_put(message) + return outgoing_tracker + end - private + private - def perform_put(message) # :nodoc: - Cproton.pn_messenger_put(@impl, message.impl) - end + def perform_put(message) # :nodoc: + Cproton.pn_messenger_put(@impl, message.impl) + end - public + public - # This call will block until the indicated number of messages - # have been sent, or until the operation times out. - # If n is -1 this call will block until all outgoing messages - # have been sent. If n is 0 then this call will send whatever - # it can without blocking. - # - def send(n = -1) - Cproton.pn_messenger_send(@impl, n) - end + # This call will block until the indicated number of messages + # have been sent, or until the operation times out. + # If n is -1 this call will block until all outgoing messages + # have been sent. If n is 0 then this call will send whatever + # it can without blocking. + # + def send(n = -1) + Cproton.pn_messenger_send(@impl, n) + end - # Moves the message from the head of the incoming message queue into - # the supplied message object. Any content in the supplied message - # will be overwritten. - # A tracker for the incoming Message is returned. The tracker can - # later be used to communicate your acceptance or rejection of the - # Message. - # - # If no message is provided in the argument, then one is created. In - # either case, the one returned will be the fetched message. - # - # ==== Options - # - # * msg - the (optional) +Message+ instance to be used - # - def get(msg = nil) + # Moves the message from the head of the incoming message queue into + # the supplied message object. Any content in the supplied message + # will be overwritten. + # A tracker for the incoming Message is returned. The tracker can + # later be used to communicate your acceptance or rejection of the + # Message. + # + # If no message is provided in the argument, then one is created. In + # either case, the one returned will be the fetched message. + # + # ==== Options + # + # * msg - the (optional) +Message+ instance to be used + # + def get(msg = nil) + msg_impl = nil + if msg.nil? then msg_impl = nil - if msg.nil? then - msg_impl = nil - else - msg_impl = msg.impl - end - perform_get(msg_impl) - msg.post_decode unless msg.nil? - return incoming_tracker + else + msg_impl = msg.impl end + perform_get(msg_impl) + msg.post_decode unless msg.nil? + return incoming_tracker + end - private + private - def perform_get(msg) # :nodoc: - Cproton.pn_messenger_get(@impl, msg) - end + def perform_get(msg) # :nodoc: + Cproton.pn_messenger_get(@impl, msg) + end - public - - # Receives up to limit messages into the incoming queue. If no value - # for limit is supplied, this call will receive as many messages as it - # can buffer internally. If the Messenger is in blocking mode, this - # call will block until at least one Message is available in the - # incoming queue. - # - # Options ==== - # - # * limit - the maximum number of messages to receive - # - def receive(limit = -1) - Cproton.pn_messenger_recv(@impl, limit) - end + public - # Returns true if the messenger is currently receiving data. - def receiving? - Cproton.pn_messenger_receiving(@impl) - end + # Receives up to limit messages into the incoming queue. If no value + # for limit is supplied, this call will receive as many messages as it + # can buffer internally. If the Messenger is in blocking mode, this + # call will block until at least one Message is available in the + # incoming queue. + # + # Options ==== + # + # * limit - the maximum number of messages to receive + # + def receive(limit = -1) + Cproton.pn_messenger_recv(@impl, limit) + end - # Attempts interrupting of the messenger thread. - # - # The Messenger interface is single-threaded, and this is the only - # function intended to be called from outside of is thread. - # - # Call this from a non-Messenger thread to interrupt it while it - # is blocking. This will cause a ::InterruptError to be raised. - # - # If there is no currently blocking call, then the next blocking - # call will be affected, even if it is within the same thread that - # originated the interrupt. - # - def interrupt - Cproton.pn_messenger_interrupt(@impl) - end + # Returns true if the messenger is currently receiving data. + def receiving? + Cproton.pn_messenger_receiving(@impl) + end - # Sends or receives any outstanding messages queued for a Messenger. - # - # This will block for the indicated timeout. This method may also do I/O - # other than sending and receiving messages. For example, closing - # connections after stop() has been called. - # - def work(timeout=-1) - err = Cproton.pn_messenger_work(@impl, timeout) - if (err == Cproton::PN_TIMEOUT) then - return false - else - check_for_error(err) - return true - end - end + # Attempts interrupting of the messenger thread. + # + # The Messenger interface is single-threaded, and this is the only + # function intended to be called from outside of is thread. + # + # Call this from a non-Messenger thread to interrupt it while it + # is blocking. This will cause a ::InterruptError to be raised. + # + # If there is no currently blocking call, then the next blocking + # call will be affected, even if it is within the same thread that + # originated the interrupt. + # + def interrupt + Cproton.pn_messenger_interrupt(@impl) + end - # Returns the number messages in the outgoing queue that have not been - # transmitted. - # - def outgoing - Cproton.pn_messenger_outgoing(@impl) + # Sends or receives any outstanding messages queued for a Messenger. + # + # This will block for the indicated timeout. This method may also do I/O + # other than sending and receiving messages. For example, closing + # connections after stop() has been called. + # + def work(timeout=-1) + err = Cproton.pn_messenger_work(@impl, timeout) + if (err == Cproton::PN_TIMEOUT) then + return false + else + check_for_error(err) + return true end + end - # Returns the number of messages in the incoming queue that have not - # been retrieved. - # - def incoming - Cproton.pn_messenger_incoming(@impl) - end + # Returns the number messages in the outgoing queue that have not been + # transmitted. + # + def outgoing + Cproton.pn_messenger_outgoing(@impl) + end - # Adds a routing rule to the Messenger's internal routing table. - # - # The route procedure may be used to influence how a Messenger will - # internally treat a given address or class of addresses. Every call - # to the route procedure will result in Messenger appending a routing - # rule to its internal routing table. - # - # Whenever a Message is presented to a Messenger for delivery, it - # will match the address of this message against the set of routing - # rules in order. The first rule to match will be triggered, and - # instead of routing based on the address presented in the message, - # the Messenger will route based on the address supplied in the rule. - # - # The pattern matching syntax supports two types of matches, a '%' - # will match any character except a '/', and a '*' will match any - # character including a '/'. - # - # A routing address is specified as a normal AMQP address, however it - # may additionally use substitution variables from the pattern match - # that triggered the rule. - # - # ==== Arguments - # - # * pattern - the address pattern - # * address - the target address - # - # ==== Examples - # - # # route messages sent to foo to the destionaty amqp://foo.com - # messenger.route("foo", "amqp://foo.com") - # - # # any message to foobar will be routed to amqp://foo.com/bar - # messenger.route("foobar", "amqp://foo.com/bar") - # - # # any message to bar/<path> will be routed to the same path within - # # the amqp://bar.com domain - # messenger.route("bar/*", "amqp://bar.com/$1") - # - # # route all Message objects over TLS - # messenger.route("amqp:*", "amqps:$1") - # - # # supply credentials for foo - # messenger.route("amqp://foo.com/*", "amqp://user:passw...@foo.com/$1") - # - # # supply credentials for all domains - # messenger.route("amqp://*", "amqp://user:password@$1") - # - # # route all addresses through a single proxy while preserving the - # # original destination - # messenger.route("amqp://%$/*", "amqp://user:password@proxy/$1/$2") - # - # # route any address through a single broker - # messenger.route("*", "amqp://user:password@broker/$1") - # - def route(pattern, address) - Cproton.pn_messenger_route(@impl, pattern, address) - end + # Returns the number of messages in the incoming queue that have not + # been retrieved. + # + def incoming + Cproton.pn_messenger_incoming(@impl) + end - # Similar to #route, except that the destination of - # the Message is determined before the message address is rewritten. - # - # The outgoing address is only rewritten after routing has been - # finalized. If a message has an outgoing address of - # "amqp://0.0.0.0:5678", and a rewriting rule that changes its - # outgoing address to "foo", it will still arrive at the peer that - # is listening on "amqp://0.0.0.0:5678", but when it arrives there, - # the receiver will see its outgoing address as "foo". - # - # The default rewrite rule removes username and password from addresses - # before they are transmitted. - # - # ==== Arguments - # - # * pattern - the outgoing address - # * address - the target address - # - def rewrite(pattern, address) - Cproton.pn_messenger_rewrite(@impl, pattern, address) - end + # Adds a routing rule to the Messenger's internal routing table. + # + # The route procedure may be used to influence how a Messenger will + # internally treat a given address or class of addresses. Every call + # to the route procedure will result in Messenger appending a routing + # rule to its internal routing table. + # + # Whenever a Message is presented to a Messenger for delivery, it + # will match the address of this message against the set of routing + # rules in order. The first rule to match will be triggered, and + # instead of routing based on the address presented in the message, + # the Messenger will route based on the address supplied in the rule. + # + # The pattern matching syntax supports two types of matches, a '%' + # will match any character except a '/', and a '*' will match any + # character including a '/'. + # + # A routing address is specified as a normal AMQP address, however it + # may additionally use substitution variables from the pattern match + # that triggered the rule. + # + # ==== Arguments + # + # * pattern - the address pattern + # * address - the target address + # + # ==== Examples + # + # # route messages sent to foo to the destionaty amqp://foo.com + # messenger.route("foo", "amqp://foo.com") + # + # # any message to foobar will be routed to amqp://foo.com/bar + # messenger.route("foobar", "amqp://foo.com/bar") + # + # # any message to bar/<path> will be routed to the same path within + # # the amqp://bar.com domain + # messenger.route("bar/*", "amqp://bar.com/$1") + # + # # route all Message objects over TLS + # messenger.route("amqp:*", "amqps:$1") + # + # # supply credentials for foo + # messenger.route("amqp://foo.com/*", "amqp://user:passw...@foo.com/$1") + # + # # supply credentials for all domains + # messenger.route("amqp://*", "amqp://user:password@$1") + # + # # route all addresses through a single proxy while preserving the + # # original destination + # messenger.route("amqp://%$/*", "amqp://user:password@proxy/$1/$2") + # + # # route any address through a single broker + # messenger.route("*", "amqp://user:password@broker/$1") + # + def route(pattern, address) + Cproton.pn_messenger_route(@impl, pattern, address) + end - def selectable - impl = Cproton.pn_messenger_selectable(@impl) + # Similar to #route, except that the destination of + # the Message is determined before the message address is rewritten. + # + # The outgoing address is only rewritten after routing has been + # finalized. If a message has an outgoing address of + # "amqp://0.0.0.0:5678", and a rewriting rule that changes its + # outgoing address to "foo", it will still arrive at the peer that + # is listening on "amqp://0.0.0.0:5678", but when it arrives there, + # the receiver will see its outgoing address as "foo". + # + # The default rewrite rule removes username and password from addresses + # before they are transmitted. + # + # ==== Arguments + # + # * pattern - the outgoing address + # * address - the target address + # + def rewrite(pattern, address) + Cproton.pn_messenger_rewrite(@impl, pattern, address) + end - # if we don't have any selectables, then return - return nil if impl.nil? + def selectable + impl = Cproton.pn_messenger_selectable(@impl) - fd = Cproton.pn_selectable_fd(impl) + # if we don't have any selectables, then return + return nil if impl.nil? - selectable = @selectables[fd] - if selectable.nil? - selectable = Selectable.new(self, impl) - @selectables[fd] = selectable - end - return selectable - end + fd = Cproton.pn_selectable_get_fd(impl) - # Returns a +Tracker+ for the message most recently sent via the put - # method. - # - def outgoing_tracker - impl = Cproton.pn_messenger_outgoing_tracker(@impl) - return nil if impl == -1 - Qpid::Proton::Tracker.new(impl) + selectable = @selectables[fd] + if selectable.nil? + selectable = Selectable.new(self, impl) + @selectables[fd] = selectable end + return selectable + end - # Returns a +Tracker+ for the most recently received message. - # - def incoming_tracker - impl = Cproton.pn_messenger_incoming_tracker(@impl) - return nil if impl == -1 - Qpid::Proton::Tracker.new(impl) - end + # Returns a +Tracker+ for the message most recently sent via the put + # method. + # + def outgoing_tracker + impl = Cproton.pn_messenger_outgoing_tracker(@impl) + return nil if impl == -1 + Tracker.new(impl) + end - # Signal the sender that you have acted on the Message - # pointed to by the tracker. If no tracker is supplied, - # then all messages that have been returned by the get - # method are accepted, except those that have already been - # auto-settled by passing beyond your incoming window size. - # - # ==== Options - # - # * tracker - the tracker - # - def accept(tracker = nil) - raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) - if tracker.nil? then - tracker = self.incoming_tracker - flag = Cproton::PN_CUMULATIVE - else - flag = 0 - end - Cproton.pn_messenger_accept(@impl, tracker.impl, flag) - end + # Returns a +Tracker+ for the most recently received message. + # + def incoming_tracker + impl = Cproton.pn_messenger_incoming_tracker(@impl) + return nil if impl == -1 + Tracker.new(impl) + end - # Rejects the incoming message identified by the tracker. - # If no tracker is supplied, all messages that have been returned - # by the get method are rejected, except those that have already - # been auto-settled by passing beyond your outgoing window size. - # - # ==== Options - # - # * tracker - the tracker - # - def reject(tracker) - raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) - if tracker.nil? then - tracker = self.incoming_tracker - flag = Cproton::PN_CUMULATIVE - else - flag = 0 - end - Cproton.pn_messenger_reject(@impl, tracker.impl, flag) - end + # Signal the sender that you have acted on the Message + # pointed to by the tracker. If no tracker is supplied, + # then all messages that have been returned by the get + # method are accepted, except those that have already been + # auto-settled by passing beyond your incoming window size. + # + # ==== Options + # + # * tracker - the tracker + # + def accept(tracker = nil) + raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) + if tracker.nil? then + tracker = self.incoming_tracker + flag = Cproton::PN_CUMULATIVE + else + flag = 0 + end + Cproton.pn_messenger_accept(@impl, tracker.impl, flag) + end - # Gets the last known remote state of the delivery associated with - # the given tracker, as long as the Message is still within your - # outgoing window. (Also works on incoming messages that are still - # within your incoming queue. See TrackerStatus for details on the - # values returned. - # - # ==== Options - # - # * tracker - the tracker - # - def status(tracker) - raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) - Qpid::Proton::TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl)) - end + # Rejects the incoming message identified by the tracker. + # If no tracker is supplied, all messages that have been returned + # by the get method are rejected, except those that have already + # been auto-settled by passing beyond your outgoing window size. + # + # ==== Options + # + # * tracker - the tracker + # + def reject(tracker) + raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) + if tracker.nil? then + tracker = self.incoming_tracker + flag = Cproton::PN_CUMULATIVE + else + flag = 0 + end + Cproton.pn_messenger_reject(@impl, tracker.impl, flag) + end - # Frees a Messenger from tracking the status associated - # with a given tracker. If you don't supply a tracker, all - # outgoing messages up to the most recent will be settled. - # - # ==== Options - # - # * tracker - the tracker - # - # ==== Examples - # - def settle(tracker) - raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) - if tracker.nil? then - tracker = self.incoming_tracker - flag = Cproton::PN_CUMULATIVE - else - flag = 0 - end - Cproton.pn_messenger_settle(@impl, tracker.impl, flag) - end + # Gets the last known remote state of the delivery associated with + # the given tracker, as long as the Message is still within your + # outgoing window. (Also works on incoming messages that are still + # within your incoming queue. See TrackerStatus for details on the + # values returned. + # + # ==== Options + # + # * tracker - the tracker + # + def status(tracker) + raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) + TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl)) + end - # Sets the incoming window. - # - # The Messenger will track the remote status of this many incoming - # deliveries after they have been accepted or rejected. - # - # Messages enter this window only when you take them into your application - # using get(). If your incoming window size is n, and you get n+1 messages - # without explicitly accepting or rejecting the oldest message, then the - # message that passes beyond the edge of the incoming window will be - # assigned the default disposition of its link. - # - # ==== Options - # - # * window - the window size - # - def incoming_window=(window) - raise TypeError.new("invalid window: #{window}") unless valid_window?(window) - Cproton.pn_messenger_set_incoming_window(@impl, window) - end + # Frees a Messenger from tracking the status associated + # with a given tracker. If you don't supply a tracker, all + # outgoing messages up to the most recent will be settled. + # + # ==== Options + # + # * tracker - the tracker + # + # ==== Examples + # + def settle(tracker) + raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) + if tracker.nil? then + tracker = self.incoming_tracker + flag = Cproton::PN_CUMULATIVE + else + flag = 0 + end + Cproton.pn_messenger_settle(@impl, tracker.impl, flag) + end - # Returns the incoming window. - # - def incoming_window - Cproton.pn_messenger_get_incoming_window(@impl) - end + # Sets the incoming window. + # + # The Messenger will track the remote status of this many incoming + # deliveries after they have been accepted or rejected. + # + # Messages enter this window only when you take them into your application + # using get(). If your incoming window size is n, and you get n+1 messages + # without explicitly accepting or rejecting the oldest message, then the + # message that passes beyond the edge of the incoming window will be + # assigned the default disposition of its link. + # + # ==== Options + # + # * window - the window size + # + def incoming_window=(window) + raise TypeError.new("invalid window: #{window}") unless valid_window?(window) + Cproton.pn_messenger_set_incoming_window(@impl, window) + end - # Sets the outgoing window. - # - # The Messenger will track the remote status of this many outgoing - # deliveries after calling send. - # A Message enters this window when you call the put() method with the - # message. If your outgoing window size is n, and you call put n+1 - # times, status information will no longer be available for the - # first message. - # - # ==== Options - # - # * window - the window size - # - def outgoing_window=(window) - raise TypeError.new("invalid window: #{window}") unless valid_window?(window) - Cproton.pn_messenger_set_outgoing_window(@impl, window) - end + # Returns the incoming window. + # + def incoming_window + Cproton.pn_messenger_get_incoming_window(@impl) + end - # Returns the outgoing window. - # - def outgoing_window - Cproton.pn_messenger_get_outgoing_window(@impl) - end + # Sets the outgoing window. + # + # The Messenger will track the remote status of this many outgoing + # deliveries after calling send. + # A Message enters this window when you call the put() method with the + # message. If your outgoing window size is n, and you call put n+1 + # times, status information will no longer be available for the + # first message. + # + # ==== Options + # + # * window - the window size + # + def outgoing_window=(window) + raise TypeError.new("invalid window: #{window}") unless valid_window?(window) + Cproton.pn_messenger_set_outgoing_window(@impl, window) + end - # Unregisters a selectable object. - def unregister_selectable(fileno) # :nodoc: - @selectables.delete(fileno) - end + # Returns the outgoing window. + # + def outgoing_window + Cproton.pn_messenger_get_outgoing_window(@impl) + end - private + # Unregisters a selectable object. + def unregister_selectable(fileno) # :nodoc: + @selectables.delete(fileno) + end - def valid_tracker?(tracker) - !tracker.nil? && tracker.is_a?(Qpid::Proton::Tracker) - end + private - def valid_window?(window) - !window.nil? && [Float, Fixnum].include?(window.class) - end + def valid_tracker?(tracker) + !tracker.nil? && tracker.is_a?(Tracker) + end + def valid_window?(window) + !window.nil? && [Float, Fixnum].include?(window.class) end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88f124f9/proton-c/bindings/ruby/lib/messenger/selectable.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/messenger/selectable.rb b/proton-c/bindings/ruby/lib/messenger/selectable.rb index 33554cd..36b5761 100644 --- a/proton-c/bindings/ruby/lib/messenger/selectable.rb +++ b/proton-c/bindings/ruby/lib/messenger/selectable.rb @@ -17,108 +17,106 @@ # under the License. #++ -module Qpid # :nodoc: - - module Proton # :nodoc: - - # Selectable enables accessing the underlying file descriptors - # for Messenger. - class Selectable - - include Qpid::Proton::Filters - - call_before :check_is_initialized, - :fileno, :capacity, :pending, :deadline, - :readable, :writable, :expired, - :registered=, :registered? - - def initialize(messenger, impl) # :nodoc: - @messenger = messenger - @impl = impl - @io = nil - @freed = false - end - - # Returns the underlying file descriptor. - # - # This can be used in conjunction with the IO class. - # - def fileno - Cproton.pn_selectable_fd(@impl) - end - - def to_io - @io ||= IO.new(fileno) - end - - # The number of bytes the selectable is capable of consuming. - # - def capacity - Cproton.pn_selectable_capacity(@impl) - end - - # The number of bytes waiting to be written to the file descriptor. - # - def pending - Cproton.pn_selectable_pending(@impl) - end - - # The future expiry time at which control will be returned to the - # selectable. - # - def deadline - tstamp = Cproton.pn_selectable_deadline(@impl) - tstamp.nil? ? nil : tstamp / 1000 - end - - def readable - Cproton.pn_selectable_readable(@impl) - end - - def writable - Cproton.pn_selectable_writable(@impl) - end - - def expired? - Cproton.pn_selectable_expired(@impl) - end - - def registered=(registered) - Cproton.pn_selectable_set_registered(@impl, registered) - end - - def registered? - Cproton.pn_selectable_is_registered(@impl) - end - - def terminal? - return true if @impl.nil? - Cproton.pn_selectable_is_terminal(@impl) - end - - def to_s - "fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}" - end - - def free - return if @freed - @freed = true - @messenger.unregister_selectable(fileno) - @io.close unless @io.nil? - Cproton.pn_selectable_free(@impl) - @impl = nil - end - - def freed? # :nodoc: - @freed - end - - private - - def check_is_initialized - raise RuntimeError.new("selectable freed") if @impl.nil? - end +module Qpid::Proton::Messenger + + # Selectable enables accessing the underlying file descriptors + # for Messenger. + # + # @private + class Selectable + + include Qpid::Proton::Filters + + call_before :check_is_initialized, + :fileno, :capacity, :pending, :deadline, + :readable, :writable, :expired, + :registered=, :registered? + + def initialize(messenger, impl) # :nodoc: + @messenger = messenger + @impl = impl + @io = nil + @freed = false + end + + # Returns the underlying file descriptor. + # + # This can be used in conjunction with the IO class. + # + def fileno + Cproton.pn_selectable_fd(@impl) + end + + def to_io + @io ||= IO.new(fileno) + end + + # The number of bytes the selectable is capable of consuming. + # + #def capacity + # Cproton.pn_selectable_capacity(@impl) + #end + + # The number of bytes waiting to be written to the file descriptor. + # + def pending + Cproton.pn_selectable_pending(@impl) + end + + # The future expiry time at which control will be returned to the + # selectable. + # + def deadline + tstamp = Cproton.pn_selectable_deadline(@impl) + tstamp.nil? ? nil : tstamp / 1000 + end + + def readable + Cproton.pn_selectable_readable(@impl) + end + + def writable + Cproton.pn_selectable_writable(@impl) + end + + def expired? + Cproton.pn_selectable_expired(@impl) + end + + def registered=(registered) + Cproton.pn_selectable_set_registered(@impl, registered) + end + + def registered? + Cproton.pn_selectable_is_registered(@impl) + end + + def terminal? + return true if @impl.nil? + Cproton.pn_selectable_is_terminal(@impl) + end + + def to_s + "fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}" + end + + def free + return if @freed + @freed = true + @messenger.unregister_selectable(fileno) + @io.close unless @io.nil? + Cproton.pn_selectable_free(@impl) + @impl = nil + end + + def freed? # :nodoc: + @freed + end + + private + def check_is_initialized + raise RuntimeError.new("selectable freed") if @impl.nil? end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88f124f9/proton-c/bindings/ruby/lib/messenger/subscription.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/messenger/subscription.rb b/proton-c/bindings/ruby/lib/messenger/subscription.rb index 21d9281..6d4973e 100644 --- a/proton-c/bindings/ruby/lib/messenger/subscription.rb +++ b/proton-c/bindings/ruby/lib/messenger/subscription.rb @@ -17,23 +17,19 @@ # under the License. #++ -module Qpid # :nodoc: +module Qpid::Proton::Messenger - module Proton # :nodoc: + # A +Subscription+ is an opaque object for working with a +Messenger+'s + # subscriptions. + # + class Subscription - # A +Subscription+ is an opaque object for working with a +Messenger+'s - # subscriptions. - # - class Subscription - - def initialize(impl) # :nodoc: - @impl = impl - end - - def impl # :nodoc: - @impl - end + def initialize(impl) # :nodoc: + @impl = impl + end + def impl # :nodoc: + @impl end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88f124f9/proton-c/bindings/ruby/lib/messenger/tracker.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/messenger/tracker.rb b/proton-c/bindings/ruby/lib/messenger/tracker.rb index 7de271a..55507e5 100644 --- a/proton-c/bindings/ruby/lib/messenger/tracker.rb +++ b/proton-c/bindings/ruby/lib/messenger/tracker.rb @@ -17,24 +17,20 @@ # under the License. #++ -module Qpid # :nodoc: +module Qpid::Proton::Messenger - module Proton # :nodoc: + # A +Tracker+ is used to track the disposition of a +Message+. + # + class Tracker - # A +Tracker+ is used to track the disposition of a +Message+. - # - class Tracker + CUMULATIVE = Cproton::PN_CUMULATIVE - CUMULATIVE = Cproton::PN_CUMULATIVE - - def initialize(impl) # :nodoc: - @impl = impl - end - - def impl # :nodoc: - @impl - end + def initialize(impl) # :nodoc: + @impl = impl + end + def impl # :nodoc: + @impl end end --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org