PROTON-1537: [ruby] Support for multiple handlers. Moved old-style handler back to Qpid::Proton::Handler::MessagingHandler Added new-style handler in Qpid::Proton::MessagingHandler Handler classes indicate the appropriate Adapter class in a well-known constant.
The new-style handler adapter is not yet implemented, coming sooon. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cf4a3f6c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cf4a3f6c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cf4a3f6c Branch: refs/heads/master Commit: cf4a3f6c248f9e3ba733e24ec4861adb92544d57 Parents: 72074d4 Author: Alan Conway <acon...@redhat.com> Authored: Sat Dec 9 15:36:36 2017 -0500 Committer: Alan Conway <acon...@redhat.com> Committed: Wed Dec 13 13:16:48 2017 -0500 ---------------------------------------------------------------------- .../bindings/ruby/lib/core/connection_driver.rb | 6 +- proton-c/bindings/ruby/lib/core/container.rb | 3 +- .../bindings/ruby/lib/core/messaging_handler.rb | 262 ++++++++++--------- proton-c/bindings/ruby/lib/handler/adapter.rb | 157 +++-------- .../ruby/lib/handler/messaging_handler.rb | 160 +++++++++++ .../ruby/lib/handler/old_messaging_adapter.rb | 151 +++++++++++ proton-c/bindings/ruby/lib/qpid_proton.rb | 9 +- proton-c/bindings/ruby/tests/test_adapter.rb | 32 +-- 8 files changed, 516 insertions(+), 264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf4a3f6c/proton-c/bindings/ruby/lib/core/connection_driver.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb index fe52d75..29bd299 100644 --- a/proton-c/bindings/ruby/lib/core/connection_driver.rb +++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb @@ -167,10 +167,8 @@ module Qpid::Proton # {#dispatch} and {#process} def initialize(io, handler) super(io) - # Allow multiple handlers for backwards compatibility - a = Array(handler) - @handler = a.size > 1 ? MessagingHandlers.new(a) : handler - @adapter = Handler::Adapter.try_convert(handler) + @handler = handler + @adapter = Handler::Adapter.adapt(handler) end # @return [MessagingHandler] The handler dispatched to by {#process} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf4a3f6c/proton-c/bindings/ruby/lib/core/container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb index dddde03..fff32e6 100644 --- a/proton-c/bindings/ruby/lib/core/container.rb +++ b/proton-c/bindings/ruby/lib/core/container.rb @@ -106,8 +106,7 @@ module Qpid::Proton # Allow ID as sole argument (handler, id = nil, handler.to_str) if (id.nil? && handler.respond_to?(:to_str)) # Allow multiple handlers ofor backwards compatibility - a = Array(handler) - @handler = a.size > 1 ? MessagingHandlers.new(a) : handler + @handler = handler @id = ((id && id.to_s) || SecureRandom.uuid).freeze # Implementation note: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf4a3f6c/proton-c/bindings/ruby/lib/core/messaging_handler.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/messaging_handler.rb b/proton-c/bindings/ruby/lib/core/messaging_handler.rb index 7a793df..785f730 100644 --- a/proton-c/bindings/ruby/lib/core/messaging_handler.rb +++ b/proton-c/bindings/ruby/lib/core/messaging_handler.rb @@ -24,152 +24,180 @@ module Qpid::Proton # class MessagingHandler - # @overload initialize(opts) - # Create a {MessagingHandler} with options +opts+ - # @option opts [Integer] :prefetch (10) - # The number of messages to fetch in advance, 0 disables prefetch. - # @option opts [Boolean] :auto_accept (true) - # If true, incoming messages are accepted automatically after {#on_message}. - # If false, the application can accept, reject or release the message - # by calling methods on {Delivery} when the message has been processed. - # @option opts [Boolean] :auto_settle (true) If true, outgoing - # messages are settled automatically when the remote peer settles. If false, - # the application must call {Delivery#settle} explicitly. - # @option opts [Boolean] :auto_open (true) - # If true, incoming connections are opened automatically. - # If false, the application must call {Connection#open} to open incoming connections. - # @option opts [Boolean] :auto_close (true) - # If true, respond to a remote close automatically with a local close. - # If false, the application must call {Connection#close} to finish closing connections. - # @option opts [Boolean] :peer_close_is_error (false) - # If true, and the remote peer closes the connection without an error condition, - # the set the local error condition {Condition}("error", "unexpected peer close") - # - # @overload initialize(prefetch=10, auto_accept=true, auto_settle=true, peer_close_is_error=false) - # @deprecated use +initialize(opts)+ overload - def initialize(*args) - @options = {} - if args.size == 1 && args[0].is_a?(Hash) - @options.replace(args[0]) - else # Fill options from deprecated fixed arguments - [:prefetch, :auto_accept, :auto_settle, :peer_close_is_error].each do |k| - opts[k] = args.shift unless args.empty? - end - end - # NOTE: the options are processed by {Handler::Adapater} + # Create a {MessagingHandler} + # @option opts [Integer] :prefetch (10) + # The number of messages to fetch in advance, 0 disables prefetch. + # @option opts [Boolean] :auto_accept (true) + # If true, incoming messages are accepted automatically after {#on_message}. + # If false, the application can accept, reject or release the message + # by calling methods on {Delivery} when the message has been processed. + # @option opts [Boolean] :auto_settle (true) If true, outgoing + # messages are settled automatically when the remote peer settles. If false, + # the application must call {Delivery#settle} explicitly. + # @option opts [Boolean] :auto_open (true) + # If true, incoming connections are opened automatically. + # If false, the application must call {Connection#open} to open incoming connections. + # @option opts [Boolean] :auto_close (true) + # If true, respond to a remote close automatically with a local close. + # If false, the application must call {Connection#close} to finish closing connections. + # @option opts [Boolean] :peer_close_is_error (false) + # If true, and the remote peer closes the connection without an error condition, + # the set the local error condition {Condition}("error", "unexpected peer close") + def initialize(opts=nil) + @options = opts && opts.clone end - public - - # @private # @return [Hash] handler options, see {#initialize} attr_reader :options + # @!group Most common events - # @!method on_transport_error(event) - # Called when the transport fails or closes unexpectedly. - # @param event [Event] The event. + # @!method on_container_start(container) + # The container event loop is started + # @param container [Container] The container. - # !@method on_connection_error(event) - # Called when the peer closes the connection with an error condition. - # @param event [Event] The event. + # @!method on_container_stop(container) + # The container event loop is stopped + # @param container [Container] The container. - # @!method on_session_error(event) - # Called when the peer closes the session with an error condition. - # @param event [Qpid:Proton::Event] The event. + # @!method on_message(delivery, message) + # A message is received. + # @param delivery [Delivery] The delivery. + # @param message [Message] The message - # @!method on_link_error(event) - # Called when the peer closes the link with an error condition. - # @param event [Event] The event. + # @!method on_sendable(sender) + # A message can be sent + # @param sender [Sender] The sender. - # @!method on_start(event) - # Called when the event loop starts. - # @param event [Event] The event. + # @!endgroup - # @!method on_connection_closed(event) - # Called when the connection is closed. - # @param event [Event] The event. + # @!group Endpoint lifecycle events - # @!method on_session_closed(event) - # Called when the session is closed. - # @param event [Event] The event. + # @!method on_connection_open(connection) + # The remote peer opened the connection + # @param connection - # @!method on_link_closed(event) - # Called when the link is closed. - # @param event [Event] The event. + # @!method on_connection_close(connection) + # The remote peer closed the connection + # @param connection - # @!method on_connection_closing(event) - # Called when the peer initiates the closing of the connection. - # @param event [Event] The event. + # @!method on_connection_error(connection) + # The remote peer closed the connection with an error condition + # @param connection - # @!method on_session_closing(event) - # Called when the peer initiates the closing of the session. - # @param event [Event] The event. + # @!method on_session_open(session) + # The remote peer opened the session + # @param session - # @!method on_link_closing(event) - # Called when the peer initiates the closing of the link. - # @param event [Event] The event. + # @!method on_session_close(session) + # The remote peer closed the session + # @param session - # @!method on_disconnected(event) - # Called when the socket is disconnected. - # @param event [Event] The event. + # @!method on_session_error(session) + # The remote peer closed the session with an error condition + # @param session - # @!method on_sendable(event) - # Called when the sender link has credit and messages can therefore - # be transferred. - # @param event [Event] The event. + # @!method on_sender_open(sender) + # The remote peer opened the sender + # @param sender - # @!method on_accepted(event) - # Called when the remote peer accepts an outgoing message. - # @param event [Event] The event. + # @!method on_sender_detach(sender) + # The remote peer detached the sender + # @param sender - # @!method on_rejected(event) - # Called when the remote peer rejects an outgoing message. - # @param event [Event] The event. + # @!method on_sender_close(sender) + # The remote peer closed the sender + # @param sender - # @!method on_released(event) - # Called when the remote peer releases an outgoing message for re-delivery as-is. - # @param event [Event] The event. + # @!method on_sender_error(sender) + # The remote peer closed the sender with an error condition + # @param sender - # @!method on_modified(event) - # Called when the remote peer releases an outgoing message for re-delivery with modifications. - # @param event [Event] The event. + # @!method on_receiver_open(receiver) + # The remote peer opened the receiver + # @param receiver - # @!method on_settled(event) - # Called when the remote peer has settled hte outgoing message. - # This is the point at which it should never be retransmitted. - # @param event [Event] The event. + # @!method on_receiver_detach(receiver) + # The remote peer detached the receiver + # @param receiver - # @!method on_message(event) - # Called when a message is received. - # - # The message is available from {Event#message}, to accept or reject the message - # use {Event#delivery} - # @param event [Event] The event. + # @!method on_receiver_close(receiver) + # The remote peer closed the receiver + # @param receiver - # @!method on_aborted(event) - # Called when message delivery is aborted by the sender. - # The {Event#delivery} provides information about the delivery, but the message should be ignored. + # @!method on_receiver_error(receiver) + # The remote peer closed the receiver with an error condition + # @param receiver - # @!method on_error(event) - # If +on_xxx_error+ method is missing, {#on_error} is called instead. - # If {#on_error} is missing, the connection is closed with the error. - # @param event [Event] the event, {Event#method} provides the original method name. + # @!endgroup - # @!method on_unhandled(event) - # If an +on_xxx+ method is missing, {#on_unhandled} is called instead. - # @param event [Event] the event, {Event#method} provides the original method name. - end + # @!group Delivery events + + # @!method on_tracker_accept(tracker) + # The receiving end accepted a delivery + # @param tracker [Tracker] The tracker. + + # @!method on_tracker_reject(tracker) + # The receiving end rejected a delivery + # @param tracker [Tracker] The tracker. + + # @!method on_tracker_release(tracker) + # The receiving end released a delivery + # @param tracker [Tracker] The tracker. + + # @!method on_tracker_modify(tracker) + # The receiving end modified a delivery + # @param tracker [Tracker] The tracker. + + # @!method on_tracker_settle(tracker) + # The receiving end settled a delivery + # @param tracker [Tracker] The tracker. + + # @!method on_delivery_settle(delivery) + # The sending end settled a delivery + # @param delivery [Delivery] The delivery. + + # @!endgroup + + # @!group Flow control events + + # @!method on_sender_drain_start(sender) + # The remote end of the sender requested draining + # @param sender [Sender] The sender. + + # @!method on_receiver_drain_finish(receiver) + # The remote end of the receiver completed draining + # @param receiver [Receiver] The receiver. + + # @!endgroup + + # @!group Transport events + + # @!method on_transport_open(transport) + # The underlying network channel opened + # @param transport [Transport] The transport. + + # @!method on_transport_close(transport) + # The underlying network channel closed + # @param transport [Transport] The transport. + + # @!method on_transport_error(transport) + # The underlying network channel is closing due to an error. + # @param transport [Transport] The transport. + + # @!endgroup + + # @!group Unhandled events - # A {MessagingHandler} that delegates events to an array of handlers, in order. - class MessagingHandlers < MessagingHandler - # @param handlers [Array<MessagingHandler>] handler objects - def initialize(handlers) @handlers = handlers; end + # @!method on_error(error_condition) + # The fallback error handler when no specific on_xxx_error is defined + # @param error_condition [Condition] Provides information about the error. - # @return [Array<MessagingHandler>] array of handlers - attr_reader :handlers + # @!method on_unhandled(method_name, *args) + # Called for events with no handler. Similar to ruby's standard #method_ + # @param method_name [Symbol] Name of the event method that would have been called. + # @param args [Array] Arguments that would have been passed - # Dispatch events to each of {#handlers} in turn - def on_unhandled(event) @handlers.each { |h| event.dispatch h }; end + # @!endgroup end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf4a3f6c/proton-c/bindings/ruby/lib/handler/adapter.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/handler/adapter.rb b/proton-c/bindings/ruby/lib/handler/adapter.rb index 25dd8c0..eb712c3 100644 --- a/proton-c/bindings/ruby/lib/handler/adapter.rb +++ b/proton-c/bindings/ruby/lib/handler/adapter.rb @@ -17,140 +17,61 @@ # @private -module Qpid::Proton::Handler +module Qpid::Proton + module Handler - # @private - # Adapter to convert raw proton events to {#MessagingHandler} events - class Adapter - - def self.try_convert(h) h.is_a?(Adapter) ? h : Adapter.new(h); end - - def initialize handler - @handler = handler || MessagingHandler.new # Pick up default MH behavior - @opts = (handler.options if handler.respond_to?(:options)) || {} - @opts[:prefetch] ||= 10 - @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error - [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k| - @opts[k] = true unless @opts.include? k - end - end - - def dispatch(method, event) - (@handler.__send__(method, event); true) if @handler.respond_to? method - end - - def delegate(method, event) - event.method = method # Update the event with the new method - event.dispatch(@handler) || dispatch(:on_unhandled, event) - end - def delegate_error(method, event) - event.method = method - unless event.dispatch(@handler) # Default behaviour if not dispatched - dispatch(:on_error, event) || dispatch(:on_unhandled, event) - event.connection.close event.context.condition # Close the connection by default + class MultiHandler + def self.maybe(h) + a = Array(h) + a.size > 1 ? self.new(h) : h end - end - - # Define repetative on_xxx_open/close methods for each endpoint type - def self.open_close(endpoint) - on_opening = :"on_#{endpoint}_opening" - on_opened = :"on_#{endpoint}_opened" - on_closing = :"on_#{endpoint}_closing" - on_closed = :"on_#{endpoint}_closed" - on_error = :"on_#{endpoint}_error" - - Module.new do - define_method(:"on_#{endpoint}_local_open") do |event| - delegate(on_opened, event) if event.context.remote_open? - end - define_method(:"on_#{endpoint}_remote_open") do |event| - if event.context.local_open? - delegate(on_opened, event) - elsif event.context.local_uninit? - delegate(on_opening, event) - event.context.open if @opts[:auto_open] + def initialize(a) + @a = a; + @options = {} + @methods = Set.new + @a.each do |h| + @methods.merge(h.methods.select { |m| m.to_s.start_with?("on_") }) + @options.merge(h.options) do |k, a, b| + raise ArgumentError, "handlers have conflicting option #{k} => #{a} != #{b}" end end + end - define_method(:"on_#{endpoint}_local_close") do |event| - delegate(on_closed, event) if event.context.remote_closed? - end + attr_reader :options - define_method(:"on_#{endpoint}_remote_close") do |event| - if event.context.remote_condition - delegate_error(on_error, event) - elsif event.context.local_closed? - delegate(on_closed, event) - elsif @opts[:peer_close_is_error] - Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close") - delegate_error(on_error, event) - else - delegate(on_closing, event) - end - event.context.close if @opts[:auto_close] + def method_missing(name, *args) + if respond_to_missing?(name) + @a.each { |h| h.__send__(name, *args) if h.respond_to? name} + else + super end end + def respond_to_missing?(name, private=false); @methods.include?(name); end + def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2 end - # Generate and include open_close modules for each endpoint type - [:connection, :session, :link].each { |endpoint| include open_close(endpoint) } - - def on_transport_error(event) delegate_error(:on_transport_error, event); end - def on_transport_closed(event) delegate(:on_transport_closed, event); end - - # Add flow control for link opening events - def on_link_local_open(event) super; add_credit(event); end - def on_link_remote_open(event) super; add_credit(event); end + # Base adapter + class Adapter + def initialize(h) + @handler = MultiHandler.maybe h + end - def on_delivery(event) - if event.link.receiver? # Incoming message - d = event.delivery - if d.aborted? - delegate(:on_aborted, event) - d.settle - elsif d.complete? - if d.link.local_closed? && @opts[:auto_accept] - d.release - else - begin - delegate(:on_message, event) - d.accept if @opts[:auto_accept] - rescue Qpid::Proton::Reject - d.reject - rescue Qpid::Proton::Release - d.release(true) - end - end - end - delegate(:on_settled, event) if d.settled? - add_credit(event) - else # Outgoing message - t = event.tracker - if t.updated? - case t.remote_state - when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event) - when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event) - when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event) - when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event) - end - delegate(:on_settled, event) if t.settled? - t.settle if @opts[:auto_settle] + def self.adapt(h) + if h.respond_to? :proton_event_adapter + a = h.proton_event_adapter + a = a.new(h) if a.is_a? Class + a + else + OldMessagingAdapter.new h end end - end - def on_link_flow(event) - add_credit(event) - l = event.link - delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0 - end + # Adapter is already an adapter + def proton_event_adapter() self; end - def add_credit(event) - r = event.receiver - prefetch = @opts[:prefetch] - if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit) - r.flow(prefetch - r.credit) + def dispatch(method, *args) + (@handler.__send__(method, *args); true) if @handler.respond_to? method end end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf4a3f6c/proton-c/bindings/ruby/lib/handler/messaging_handler.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/handler/messaging_handler.rb b/proton-c/bindings/ruby/lib/handler/messaging_handler.rb new file mode 100644 index 0000000..0e94c17 --- /dev/null +++ b/proton-c/bindings/ruby/lib/handler/messaging_handler.rb @@ -0,0 +1,160 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +module Qpid::Proton + module Handler + + # @deprecated use {Qpid::Proton::MessagingHandler} + class MessagingHandler + + # @overload initialize(opts) + # Create a {MessagingHandler} with options +opts+ + # @option opts [Integer] :prefetch (10) + # The number of messages to fetch in advance, 0 disables prefetch. + # @option opts [Boolean] :auto_accept (true) + # If true, incoming messages are accepted automatically after {#on_message}. + # If false, the application can accept, reject or release the message + # by calling methods on {Delivery} when the message has been processed. + # @option opts [Boolean] :auto_settle (true) If true, outgoing + # messages are settled automatically when the remote peer settles. If false, + # the application must call {Delivery#settle} explicitly. + # @option opts [Boolean] :auto_open (true) + # If true, incoming connections are opened automatically. + # If false, the application must call {Connection#open} to open incoming connections. + # @option opts [Boolean] :auto_close (true) + # If true, respond to a remote close automatically with a local close. + # If false, the application must call {Connection#close} to finish closing connections. + # @option opts [Boolean] :peer_close_is_error (false) + # If true, and the remote peer closes the connection without an error condition, + # the set the local error condition {Condition}("error", "unexpected peer close") + # + # @overload initialize(prefetch=10, auto_accept=true, auto_settle=true, peer_close_is_error=false) + # @deprecated use +initialize(opts)+ overload + def initialize(*args) + @options = {} + if args.size == 1 && args[0].is_a?(Hash) + @options.replace(args[0]) + else # Fill options from deprecated fixed arguments + [:prefetch, :auto_accept, :auto_settle, :peer_close_is_error].each do |k| + opts[k] = args.shift unless args.empty? + end + end + # NOTE: the options are processed by {Handler::Adapater} + end + + public + + # @return [Hash] handler options, see {#initialize} + attr_reader :options + + # @!method on_transport_error(event) + # Called when the transport fails or closes unexpectedly. + # @param event [Event] The event. + + # !@method on_connection_error(event) + # Called when the peer closes the connection with an error condition. + # @param event [Event] The event. + + # @!method on_session_error(event) + # Called when the peer closes the session with an error condition. + # @param event [Qpid:Proton::Event] The event. + + # @!method on_link_error(event) + # Called when the peer closes the link with an error condition. + # @param event [Event] The event. + + # @!method on_start(event) + # Called when the event loop starts. + # @param event [Event] The event. + + # @!method on_connection_closed(event) + # Called when the connection is closed. + # @param event [Event] The event. + + # @!method on_session_closed(event) + # Called when the session is closed. + # @param event [Event] The event. + + # @!method on_link_closed(event) + # Called when the link is closed. + # @param event [Event] The event. + + # @!method on_connection_closing(event) + # Called when the peer initiates the closing of the connection. + # @param event [Event] The event. + + # @!method on_session_closing(event) + # Called when the peer initiates the closing of the session. + # @param event [Event] The event. + + # @!method on_link_closing(event) + # Called when the peer initiates the closing of the link. + # @param event [Event] The event. + + # @!method on_disconnected(event) + # Called when the socket is disconnected. + # @param event [Event] The event. + + # @!method on_sendable(event) + # Called when the sender link has credit and messages can therefore + # be transferred. + # @param event [Event] The event. + + # @!method on_accepted(event) + # Called when the remote peer accepts an outgoing message. + # @param event [Event] The event. + + # @!method on_rejected(event) + # Called when the remote peer rejects an outgoing message. + # @param event [Event] The event. + + # @!method on_released(event) + # Called when the remote peer releases an outgoing message for re-delivery as-is. + # @param event [Event] The event. + + # @!method on_modified(event) + # Called when the remote peer releases an outgoing message for re-delivery with modifications. + # @param event [Event] The event. + + # @!method on_settled(event) + # Called when the remote peer has settled hte outgoing message. + # This is the point at which it should never be retransmitted. + # @param event [Event] The event. + + # @!method on_message(event) + # Called when a message is received. + # + # The message is available from {Event#message}, to accept or reject the message + # use {Event#delivery} + # @param event [Event] The event. + + # @!method on_aborted(event) + # Called when message delivery is aborted by the sender. + # The {Event#delivery} provides information about the delivery, but the message should be ignored. + + # @!method on_error(event) + # If +on_xxx_error+ method is missing, {#on_error} is called instead. + # If {#on_error} is missing, the connection is closed with the error. + # @param event [Event] the event, {Event#method} provides the original method name. + + # @!method on_unhandled(event) + # If an +on_xxx+ method is missing, {#on_unhandled} is called instead. + # @param event [Event] the event, {Event#method} provides the original method name. + end + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf4a3f6c/proton-c/bindings/ruby/lib/handler/old_messaging_adapter.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/handler/old_messaging_adapter.rb b/proton-c/bindings/ruby/lib/handler/old_messaging_adapter.rb new file mode 100644 index 0000000..c43dc8c --- /dev/null +++ b/proton-c/bindings/ruby/lib/handler/old_messaging_adapter.rb @@ -0,0 +1,151 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +# @private +module Qpid::Proton + module Handler + + # Adapter to convert raw proton events to old {Handler::MessagingHandler} events + class OldMessagingAdapter < Adapter + def initialize handler + super + @opts = (handler.options if handler.respond_to?(:options)) || {} + @opts[:prefetch] ||= 10 + @opts[:peer_close_is_error] = false unless @opts.include? :peer_close_is_error + [:auto_accept, :auto_settle, :auto_open, :auto_close].each do |k| + @opts[k] = true unless @opts.include? k + end + end + + def delegate(method, event) + event.method = method # Update the event with the new method + event.dispatch(@handler) || dispatch(:on_unhandled, event) + end + def delegate_error(method, event) + event.method = method + unless event.dispatch(@handler) # Default behaviour if not dispatched + dispatch(:on_error, event) || dispatch(:on_unhandled, event) + event.connection.close event.context.condition # Close the connection by default + end + end + + # Define repetative on_xxx_open/close methods for each endpoint type + def self.open_close(endpoint) + on_opening = :"on_#{endpoint}_opening" + on_opened = :"on_#{endpoint}_opened" + on_closing = :"on_#{endpoint}_closing" + on_closed = :"on_#{endpoint}_closed" + on_error = :"on_#{endpoint}_error" + + Module.new do + define_method(:"on_#{endpoint}_local_open") do |event| + delegate(on_opened, event) if event.context.remote_open? + end + + define_method(:"on_#{endpoint}_remote_open") do |event| + if event.context.local_open? + delegate(on_opened, event) + elsif event.context.local_uninit? + delegate(on_opening, event) + event.context.open if @opts[:auto_open] + end + end + + define_method(:"on_#{endpoint}_local_close") do |event| + delegate(on_closed, event) if event.context.remote_closed? + end + + define_method(:"on_#{endpoint}_remote_close") do |event| + if event.context.remote_condition + delegate_error(on_error, event) + elsif event.context.local_closed? + delegate(on_closed, event) + elsif @opts[:peer_close_is_error] + Condition.assign(event.context.__send__(:_remote_condition), "unexpected peer close") + delegate_error(on_error, event) + else + delegate(on_closing, event) + end + event.context.close if @opts[:auto_close] + end + end + end + # Generate and include open_close modules for each endpoint type + [:connection, :session, :link].each { |endpoint| include open_close(endpoint) } + + def on_transport_error(event) delegate_error(:on_transport_error, event); end + def on_transport_closed(event) delegate(:on_transport_closed, event); end + + # Add flow control for link opening events + def on_link_local_open(event) super; add_credit(event); end + def on_link_remote_open(event) super; add_credit(event); end + + + def on_delivery(event) + if event.link.receiver? # Incoming message + d = event.delivery + if d.aborted? + delegate(:on_aborted, event) + d.settle + elsif d.complete? + if d.link.local_closed? && @opts[:auto_accept] + d.release + else + begin + delegate(:on_message, event) + d.accept if @opts[:auto_accept] + rescue Qpid::Proton::Reject + d.reject + rescue Qpid::Proton::Release + d.release(true) + end + end + end + delegate(:on_settled, event) if d.settled? + add_credit(event) + else # Outgoing message + t = event.tracker + if t.updated? + case t.remote_state + when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event) + when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event) + when Qpid::Proton::Delivery::RELEASED then delegate(:on_released, event) + when Qpid::Proton::Delivery::MODIFIED then delegate(:on_modified, event) + end + delegate(:on_settled, event) if t.settled? + t.settle if @opts[:auto_settle] + end + end + end + + def on_link_flow(event) + add_credit(event) + l = event.link + delegate(:on_sendable, event) if l.sender? && l.open? && l.credit > 0 + end + + def add_credit(event) + r = event.receiver + prefetch = @opts[:prefetch] + if r && r.open? && (r.drained == 0) && prefetch && (prefetch > r.credit) + r.flow(prefetch - r.credit) + end + end + end + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf4a3f6c/proton-c/bindings/ruby/lib/qpid_proton.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb index 3af6d39..2d93454 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton.rb @@ -93,18 +93,13 @@ require "messenger/messenger" # Handler classes require "handler/adapter" +require "handler/old_messaging_adapter" # Core classes that depend on Handler require "core/messaging_handler" require "core/container" require "core/connection_driver" # Backwards compatibility shims - require "reactor/container" - -module Qpid::Proton::Handler - # @deprecated alias for backwards compatibility - MessagingHandler = Qpid::Proton::MessagingHandler -end - +require "handler/messaging_handler" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cf4a3f6c/proton-c/bindings/ruby/tests/test_adapter.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_adapter.rb b/proton-c/bindings/ruby/tests/test_adapter.rb index c90f788..50f46c3 100644 --- a/proton-c/bindings/ruby/tests/test_adapter.rb +++ b/proton-c/bindings/ruby/tests/test_adapter.rb @@ -21,25 +21,25 @@ require 'qpid_proton' require 'test_tools' include Qpid::Proton -# Tests with Mock handler that handles all methods. -class TestAllHandler < Minitest::Test - - class AllHandler < MessagingHandler - def initialize(*args) - super(*args) - @calls = [] - end +# Records every call +class AllHandler < MessagingHandler + def initialize(*args) + super(*args) + @calls = [] + end - attr_accessor :calls + attr_accessor :calls - def names; @calls.map { |c| c[0] }; end - def events; @calls.map { |c| c[1] }; end + def names; @calls.map { |c| c[0] }; end + def events; @calls.map { |c| c[1] }; end - def method_missing(name, *args) (/^on_/ =~ name) ? (@calls << [name] + args) : super; end - def respond_to_missing?(name, private=false); (/^on_/ =~ name); end - def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2 - end + def method_missing(name, *args) (/^on_/ =~ name) ? (@calls << [name] + args) : super; end + def respond_to_missing?(name, private=false); (/^on_/ =~ name); end + def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2 +end +# Tests with Mock handler that handles all methods, expect both old and new calls +class TestOldHandler < Minitest::Test def setup @h = [AllHandler.new, AllHandler.new] @ch, @sh = *@h @@ -158,7 +158,7 @@ class TestAllHandler < Minitest::Test end # Test with real handlers that implement a few methods -class TestUnhandled < Minitest::Test +class TestOldUnhandled < Minitest::Test def test_message handler_class = Class.new(MessagingHandler) do --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org