PROTON-781: Added Container to the Ruby reactive APIs.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b8ba5acb Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b8ba5acb Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b8ba5acb Branch: refs/heads/PROTON-781-ruby-reactor-apis Commit: b8ba5acb285d8f540dbf2226bb97cb564e8c09ff Parents: aa8222e Author: Darryl L. Pierce <mcpie...@gmail.com> Authored: Thu Feb 26 10:26:11 2015 -0500 Committer: Darryl L. Pierce <mcpie...@gmail.com> Committed: Mon Jun 8 13:57:52 2015 -0400 ---------------------------------------------------------------------- proton-c/bindings/ruby/lib/qpid_proton.rb | 1 + proton-c/bindings/ruby/lib/reactor/container.rb | 272 +++++++++++++++++++ 2 files changed, 273 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b8ba5acb/proton-c/bindings/ruby/lib/qpid_proton.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb index f40c608..ba1e66e 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton.rb @@ -106,6 +106,7 @@ require "reactor/urls" require "reactor/connector" require "reactor/backoff" require "reactor/session_per_connection" +require "reactor/container" module Qpid::Proton # @private http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b8ba5acb/proton-c/bindings/ruby/lib/reactor/container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/reactor/container.rb b/proton-c/bindings/ruby/lib/reactor/container.rb new file mode 100644 index 0000000..93b49bb --- /dev/null +++ b/proton-c/bindings/ruby/lib/reactor/container.rb @@ -0,0 +1,272 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +module Qpid::Proton::Reactor + + # @private + class InternalTransactionHandler < Qpid::Proton::Handler::OutgoingMessageHandler + + def initialize + super + end + + def on_settled(event) + if event.delivery.respond_to? :transaction + event.transaction = event.delivery.transaction + event.delivery.transaction.handle_outcome(event) + end + end + + end + + + # A representation of the AMQP concept of a container which, loosely + # speaking, is something that establishes links to or from another + # container on which messages are transferred. + # + # This is an extension to the Reactor classthat adds convenience methods + # for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender + # and Qpid::Proton::Receiver. + # + # @example + # + class Container < Reactor + + include Qpid::Proton::Util::Reactor + + include Qpid::Proton::Util::UUID + + attr_accessor :container_id + attr_accessor :global_handler + + def initialize(handlers, options = {}) + super(handlers, options) + + # only do the following if we're creating a new instance + if !options.has_key?(:impl) + @ssl = SSLConfig.new + if options[:global_handler] + self.global_handler = GlobalOverrides.new(options[:global_handler]) + else + # very ugly, but using self.global_handler doesn't work in the constructor + ghandler = Reactor.instance_method(:global_handler).bind(self).call + ghandler = GlobalOverrides.new(ghandler) + Reactor.instance_method(:global_handler=).bind(self).call(ghandler) + end + @trigger = nil + @container_id = generate_uuid + end + end + + # Initiates the establishment of an AMQP connection. + # + # @param options [Hash] A hash of named arguments. + # + def connect(options = {}) + conn = self.connection(options[:handler]) + conn.container = self.container_id || generate_uuid + connector = Connector.new(conn) + conn.overrides = connector + if !options[:url].nil? + connector.address = URLs.new([options[:url]]) + elsif !options[:urls].nil? + connector.address = URLs.new(options[:urls]) + elsif !options[:address].nil? + connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])]) + else + raise ArgumentError.new("either :url or :urls or :address required") + end + + connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil? + if !options[:reconnect].nil? + connector.reconnect = options[:reconnect] + else + connector.reconnect = Backoff.new() + end + + connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable + + conn.open + + return conn + end + + def _session(context) + if context.is_a?(Qpid::Proton::URL) + return self._session(self.connect(:url => context)) + elsif context.is_a?(Qpid::Proton::Session) + return context + elsif context.is_a?(Qpid::Proton::Connection) + if context.session_policy? + return context.session_policy.session(context) + else + return self.create_session(context) + end + else + return context.session + end + end + + # Initiates the establishment of a link over which messages can be sent. + # + # @param context [String, URL] The context. + # @param opts [Hash] Additional options. + # @param opts [String, Qpid::Proton::URL] The target address. + # @param opts [String] :source The source address. + # @param opts [Boolean] :dynamic + # @param opts [Object] :handler + # @param opts [Object] :tag_generator The tag generator. + # @param opts [Hash] :options Addtional link options + # + # @return [Sender] The sender. + # + def create_sender(context, opts = {}) + if context.is_a?(::String) + context = Qpid::Proton::URL.new(context) + end + + target = opts[:target] + if context.is_a?(Qpid::Proton::URL) && target.nil? + target = context.path + end + + session = self._session(context) + + sender = session.sender(opts[:name] || + id(session.connection.container, + target, opts[:source])) + sender.source.address = opts[:source] if !opts[:source].nil? + sender.target.address = target if target + sender.handler = opts[:handler] if !opts[:handler].nil? + sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil? + self._apply_link_options(opts[:options], sender) + sender.open + return sender + end + + # Initiates the establishment of a link over which messages can be received. + # + # There are two accepted arguments for the context + # 1. If a Connection is supplied then the link is established using that + # object. The source, and optionally the target, address can be supplied + # 2. If it is a String or a URL then a new Connection is created on which + # the link will be attached. If a path is specified, but not the source + # address, then the path of the URL is used as the target address. + # + # The name will be generated for the link if one is not specified. + # + # @param context [Connection, URL, String] The connection or the address. + # @param opts [Hash] Additional otpions. + # @option opts [String, Qpid::Proton::URL] The source address. + # @option opts [String] :target The target address + # @option opts [String] :name The link name. + # @option opts [Boolean] :dynamic + # @option opts [Object] :handler + # @option opts [Hash] :options Additional link options. + # + # @return [Receiver + # + def create_receiver(context, opts = {}) + if context.is_a?(::String) + context = Qpid::Proton::URL.new(context) + end + + source = opts[:source] + if context.is_a?(Qpid::Proton::URL) && source.nil? + source = context.path + end + + session = self._session(context) + + receiver = session.receiver(opts[:name] || + id(session.connection.container, + source, opts[:target])) + receiver.source.address = source if source + receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic] + receiver.target.address = opts[:target] if !opts[:target].nil? + receiver.handler = opts[:handler] if !opts[:handler].nil? + self._apply_link_options(opts[:options], receiver) + receiver.open + return receiver + end + + def declare_transaction(context, handler = nil, settle_before_discharge = false) + if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil? + class << context + attr_accessor :txn_ctl + end + context.txn_ctl = self.create_sender(context, nil, "txn-ctl", + InternalTransactionHandler.new()) + end + return Transaction.new(context.txn_ctl, handler, settle_before_discharge) + end + + # Initiates a server socket, accepting incoming AMQP connections on the + # interface and port specified. + # + # @param url [] + # @param ssl_domain [] + # + def listen(url, ssl_domain = nil) + url = Qpid::Proton::URL.new(url) + acceptor = self.acceptor(url.host, url.port) + ssl_config = ssl_domain + if ssl_config.nil? && (url.scheme == 'amqps') && @ssl + ssl_config = @ssl.server + end + if !ssl_config.nil? + acceptor.ssl_domain(ssl_config) + end + return acceptor + end + + def do_work(timeout = nil) + self.timeout = timeout unless timeout.nil? + self.process + end + + def id(container, remote, local) + if !local.nil? && !remote.nil? + "#{container}-#{remote}-#{local}" + elsif !local.nil? + "#{container}-#{local}" + elsif !remote.nil? + "#{container}-#{remote}" + else + "#{container}-#{generate_uuid}" + end + end + + def _apply_link_options(options, link) + if !options.nil? && !options.empty? + if !options.is_a?(::List) + options = [Options].flatten + end + + options.each {|option| o.apply(link) if o.test(link)} + end + end + + def to_s + "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>" + end + + end + +end --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org