PROTON-1064: [ruby] New Container with native ruby IO Updated all examples and tests to use new container, old Reactor::Container still in place, will be removed or re-implemented over the new container.
- Based on ConnectionDriver, wrapper for pn_connection_driver_t - Native polling with standard ruby IO.select() - Thread-friendly: no blocking IO in C code, all in ruby - connect/listen use standard TCPSocket/TCPServer - connect_with/listen_with can use any type of IO socket object Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/51cda9d5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/51cda9d5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/51cda9d5 Branch: refs/heads/master Commit: 51cda9d5d50059d1cfe7af57ec4bfb2e2ab1e70e Parents: 86de6b5 Author: Alan Conway <acon...@redhat.com> Authored: Tue Oct 31 17:16:08 2017 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Tue Nov 7 13:31:51 2017 -0500 ---------------------------------------------------------------------- examples/ruby/README.md | 54 ++-- examples/ruby/broker.rb | 63 +---- examples/ruby/client.rb | 25 +- examples/ruby/direct_recv.rb | 35 +-- examples/ruby/direct_send.rb | 37 +-- examples/ruby/example_test.rb | 125 +++++----- examples/ruby/helloworld.rb | 38 +-- examples/ruby/helloworld_direct.rb | 38 +-- examples/ruby/server.rb | 39 ++- examples/ruby/simple_recv.rb | 36 +-- examples/ruby/simple_send.rb | 33 +-- proton-c/bindings/ruby/CMakeLists.txt | 4 +- proton-c/bindings/ruby/lib/core/connection.rb | 67 +++-- .../bindings/ruby/lib/core/connection_driver.rb | 99 ++++---- proton-c/bindings/ruby/lib/core/container.rb | 249 +++++++++++++++++++ proton-c/bindings/ruby/lib/core/endpoint.rb | 11 +- proton-c/bindings/ruby/lib/core/listener.rb | 110 ++++++++ proton-c/bindings/ruby/lib/core/session.rb | 2 +- proton-c/bindings/ruby/lib/core/transport.rb | 32 +-- proton-c/bindings/ruby/lib/core/uri.rb | 50 ++++ proton-c/bindings/ruby/lib/core/url.rb | 5 + proton-c/bindings/ruby/lib/event/event.rb | 7 +- .../ruby/lib/handler/endpoint_state_handler.rb | 6 +- proton-c/bindings/ruby/lib/qpid_proton.rb | 8 + proton-c/bindings/ruby/lib/reactor/reactor.rb | 1 + proton-c/bindings/ruby/lib/util/condition.rb | 35 ++- proton-c/bindings/ruby/lib/util/engine.rb | 4 +- proton-c/bindings/ruby/lib/util/uri.rb | 27 -- .../ruby/tests/test_connection_driver.rb | 6 +- proton-c/bindings/ruby/tests/test_container.rb | 155 +++++++++--- proton-c/bindings/ruby/tests/test_tools.rb | 38 ++- 31 files changed, 903 insertions(+), 536 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/README.md ---------------------------------------------------------------------- diff --git a/examples/ruby/README.md b/examples/ruby/README.md index 38cc6ba..8526e29 100644 --- a/examples/ruby/README.md +++ b/examples/ruby/README.md @@ -1,32 +1,24 @@ -## What Is The Reactor? - -A little outside of the scope of this document, but the reactor is an event source for letting an application know about events in the Proton messaging system. With this set of APIs an application can be register handlers that are notified when a connection is created, a message received, or a session closes. - -### Handlers - -An application creates **handlers**, objects which provide methods through which the reactor notifies the application's components of events and allows them each to handle the ones in which they are interested (see the Chain Of Responsibility design pattern for more on this idea). There are some pre-defined handlers for responding to incoming message events, outgoing message events, data flow and managing the AMQP endpoints. Look in the **Qpid::Proton::Handlers** package for more details on these classes. - -## Simple Reactor Examples +## Simple Examples ### The Broker -The reactor examples come with a sample broker which can be used by other examples and which also works as an example itself. For now we'll just start up the broker example and tell it to listen on port 8888: +The examples come with a sample broker which can be used by other examples and which also works as an example itself. For now we'll just start up the broker example and tell it to listen on port 8888: ```` -$ ruby ../examples/ruby/reactor/broker.rb --address=0.0.0.0:8888 -Listening on 0.0.0.0:8888 +$ ruby broker.rb amqp://:8888 +Listening on amqp://:8888 ```` This example broker will receive messages, create queues as needed, and deliver messages to endpoints. ### Hello World Using A Broker -Our first example creates an endpoint that sends messages to a queue to which it is subscribed. So it both sends and receives its message in one pass. +Our first example creates an endpoint that sends messages to a queue to which it is subscribed. So it both sends and receives a message. To start it, simply run: ``` -$ ruby ../examples/ruby/reactor/helloworld.rb --address=0.0.0.0:8888 --queue=examples +$ ruby helloworld.rb //:8888 Hello world! ``` @@ -42,12 +34,13 @@ The following events occur while **helloworld.rb** runs: ### Hello World Without A Broker required -The next example we'll look at will send the classic "Hello world" message to itself directly. This example shows some very fundamental elements of the reactor APIs that you should understand. +The next example we'll look at will send the classic "Hello world" message to itself directly, +without going through a broker. To launch the example: ``` - $ ruby helloworld_direct.rb --address=0.0.0.0:8888/examples + $ ruby helloworld_direct.rb //:9999 Hello world! ``` @@ -58,26 +51,25 @@ The direct version takes on the responsibility for listening to incoming connect * **on_accepted** - Fired when a message is received. * **on_connection_closed** - Fired when an endpoint closes its connection. -## More Complex Reactor Examples +## More Complex Examples Now that we've covered the basics with the archetypical hello world app, let's look at some more interesting examples. -There are four example applications that demonstrate how to send and receive messages both directly and through an intermediary, such as a broker: +The following two client examples send and receive messages to an external broker or server: - * **simple_send.rb** - sends messages to a receiver at a specific address and receives responses via an intermediary, - * **simple_recv.rb** - receives messages from via an intermediary, - * **direct_send.rb** - sends messages directly to a receiver and listens for responses itself, and - * **direct_recv.rb** - receives messages directly. + * **simple_send.rb** - connect to a server, send messages to an address + * **simple_recv.rb** - connect to a server, receives messages from an address + +For example: start `broker.rb`; run `simple_send.rb` to send messages to a +broker queue; then `simple_recv.rb` to receive the messages from the broker. - Simple send and direct send may, at first, seem to be so similar that you wonder why they're not just the same applciation. And I know for me I was wonder as I wrote the list above why there were two examples. The reason is that **simple_send.rb** uses the intermediary transfer responses to the messages it sends, while **direct_send.rb** uses an *Acceptor* to listen for an process responses. +The following two examples are *servers* that can be connected to directly, without a broker: - You can use the examples in the follow ways: + * **direct_send.rb** - sends messages directly to a receiver and listens for responses itself, and + * **direct_recv.rb** - receives messages directly. - ``` - simple_send.rb -> broker <- simple_recv.rb - simple_send.rb -> direct_recv.rb - direct_send.rb -> simple_recv.rb - ``` +For example if you start `direct_recv.rb`, you can connect to it directly with +`simple_send.rb` vice-versa with `direct_send.rb` and `simple_recv.rb` In this set of examples we see the following event occurring, in addition to what we've seen before: @@ -91,11 +83,11 @@ The way the broker works is to listen to incoming connections, examine the compo The components of the broker example include: * **Broker** - A class that extends the MessagingHandler class. It accepts incoming connections, manages subscribing them to exchanges, and transfers messages between them. - * **Exchange** - A class that represents a message queue, tracking what endpoints are subscribed to it. + * **MessageQueue** - Distributes messages to subscriptions. The Broker manages a map connecting a queue address to the instance of Exchange that holds references to the endpoints of interest. -The broker application demonstrates a new set of reactor events: +The broker application demonstrates a new set of events: * **on_link_opening** - Fired when a remote link is opened but the local end is not yet open. From this event the broker grabs the address and subscribes the link to an exchange for that address. * **on_link_closing** - Fired when a remote link is closed but the local end is still open. From this event the broker grabs the address and unsubscribes the link from that exchange. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/broker.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/broker.rb b/examples/ruby/broker.rb index 2c37a39..bd461ed 100644 --- a/examples/ruby/broker.rb +++ b/examples/ruby/broker.rb @@ -21,11 +21,7 @@ require 'qpid_proton' require 'optparse' require 'pathname' -def debug(text) - print "[#{Time.now.strftime('%s')}] #{text}\n" if $options[:debug] -end - -class Exchange +class MessageQueue def initialize(dynamic = false) @dynamic = dynamic @@ -34,30 +30,22 @@ class Exchange end def subscribe(consumer) - debug("subscribing #{consumer}") @consumers << (consumer) - debug(" there are #{@consumers.size} consumers") end def unsubscribe(consumer) - debug("unsubscribing #{consumer}") if @consumers.include?(consumer) @consumers.delete(consumer) - else - debug(" consumer doesn't exist") end - debug(" there are #{@consumers.size} consumers") @consumers.empty? && (@dynamic || @queue.empty?) end def publish(message) - debug("queueing message: #{message.body}") @queue << message self.dispatch end def dispatch(consumer = nil) - debug("dispatching: consumer=#{consumer}") if consumer c = [consumer] else @@ -69,10 +57,8 @@ class Exchange end def deliver_to(consumers) - debug("delivering to #{consumers.size} consumer(s)") result = false consumers.each do |consumer| - debug(" current consumer=#{consumer} credit=#{consumer.credit}") if consumer.credit > 0 && !@queue.empty? consumer.send(@queue.pop(true)) result = true @@ -92,32 +78,23 @@ class Broker < Qpid::Proton::Handler::MessagingHandler end def on_start(event) - debug("on_start event") @acceptor = event.container.listen(@url) print "Listening on #{@url}\n" end def queue(address) - debug("fetching queue for #{address}: (there are #{@queues.size} queues)") unless @queues.has_key?(address) - debug(" creating new queue") - @queues[address] = Exchange.new - else - debug(" using existing queue") + @queues[address] = MessageQueue.new end - result = @queues[address] - debug(" returning #{result}") - return result + @queues[address] end def on_link_opening(event) - debug("processing on_link_opening") - debug("link is#{event.link.sender? ? '' : ' not'} a sender") if event.link.sender? if event.link.remote_source.dynamic? address = SecureRandom.uuid event.link.source.address = address - q = Exchange.new(true) + q = MessageQueue.new(true) @queues[address] = q q.subscribe(event.link) elsif event.link.remote_source.address @@ -130,7 +107,6 @@ class Broker < Qpid::Proton::Handler::MessagingHandler end def unsubscribe(link) - debug("unsubscribing #{link.source.address}") if @queues.has_key?(link.source.address) if @queues[link.source.address].unsubscribe(link) @queues.delete(link.source.address) @@ -159,40 +135,21 @@ class Broker < Qpid::Proton::Handler::MessagingHandler end def on_sendable(event) - debug("on_sendable event") q = self.queue(event.link.source.address) - debug(" dispatching #{event.message} to #{q}") q.dispatch(event.link) end def on_message(event) - debug("on_message event") q = self.queue(event.link.target.address) - debug(" dispatching #{event.message} to #{q}") q.publish(event.message) end end -$options = { - :address => "localhost:5672", - :debug => false -} - -OptionParser.new do |opts| - opts.banner = "Usage: #{Pathname.new(__FILE__).basename} [$options]" - - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{$options[:address]}).") do |address| - $options[:address] = address - end - - opts.on("-d", "--debug", "Enable debugging output (def. #{$options[:debug]})") do - $options[:debug] = true - end - -end.parse! - -begin - Qpid::Proton::Reactor::Container.new(Broker.new($options[:address])).run -rescue Interrupt +if ARGV.size != 1 + STDERR.puts "Usage: #{__FILE__} URL +Start an example broker listening on URL" + return 1 end +url, = ARGV +Qpid::Proton::Container.new(Broker.new(url)).run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/client.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/client.rb b/examples/ruby/client.rb index 8c38f38..46c1251 100644 --- a/examples/ruby/client.rb +++ b/examples/ruby/client.rb @@ -22,15 +22,17 @@ require 'optparse' class Client < Qpid::Proton::Handler::MessagingHandler - def initialize(url, requests) + def initialize(url, address, requests) super() @url = url + @address = address @requests = requests end def on_start(event) - @sender = event.container.create_sender(@url) - @receiver = event.container.create_receiver(@sender.connection, :dynamic => true) + c = event.container.connect(@url) + @sender = c.open_sender(@address) + @receiver = c.open_receiver({:dynamic => true}) end def next_request @@ -70,13 +72,10 @@ REQUESTS = ["Twas brillig, and the slithy toves", "All mimsy were the borogroves,", "And the mome raths outgrabe."] -options = { - :address => "localhost:5672/examples", -} - -OptionParser.new do |opts| - opts.banner = "Usage: client.rb [options]" - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") { |address| options[:address] = address } -end.parse! - -Qpid::Proton::Reactor::Container.new(Client.new(options[:address], REQUESTS)).run +if ARGV.size != 2 + STDERR.puts "Usage: #{__FILE__} URL ADDRESS +Connect to URL and send messages to ADDRESS" + return 1 +end +url, address = ARGV +Qpid::Proton::Container.new(Client.new(url, address, REQUESTS)).run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/direct_recv.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/direct_recv.rb b/examples/ruby/direct_recv.rb index 411efba..b2b0ba9 100644 --- a/examples/ruby/direct_recv.rb +++ b/examples/ruby/direct_recv.rb @@ -22,15 +22,16 @@ require 'optparse' class DirectReceive < Qpid::Proton::Handler::MessagingHandler - def initialize(url, expected) + def initialize(url, address, count) super() @url = url - @expected = expected + @address = address + @expected = count @received = 0 end def on_start(event) - @acceptor = event.container.listen(@url) + event.container.listen(@url) end def on_message(event) @@ -38,29 +39,17 @@ class DirectReceive < Qpid::Proton::Handler::MessagingHandler puts "Received: #{event.message.body}" @received = @received + 1 if @received == @expected - event.connection.close - @acceptor.close + event.container.stop end end end end -options = { - :address => "localhost:5672/examples", - :messages => 100, -} - -OptionParser.new do |opts| - opts.banner = "Usage: simple_send.rb [options]" - - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address| - options[:address] = address - end - - opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}", - OptionParser::DecimalInteger) do |messages| - options[:messages] = messages - end -end.parse! +unless (2..3).include? ARGV.size + STDERR.puts "Usage: #{__FILE__} URL ADDRESS [COUNT] +Listen on URL and receive COUNT messages from ADDRESS" + return 1 +end +url, address, count = ARGV +Qpid::Proton::Container.new(DirectReceive.new(url, address, count || 10)).run -Qpid::Proton::Reactor::Container.new(DirectReceive.new(options[:address], options[:messages])).run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/direct_send.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/direct_send.rb b/examples/ruby/direct_send.rb index ed679aa..e54e1ab 100644 --- a/examples/ruby/direct_send.rb +++ b/examples/ruby/direct_send.rb @@ -20,23 +20,19 @@ require 'qpid_proton' require 'optparse' -options = { - :address => "localhost:5672/examples", - :messages => 100, -} +class DirectSend < Qpid::Proton::Handler::MessagingHandler -class SimpleSend < Qpid::Proton::Handler::MessagingHandler - - def initialize(url, expected) + def initialize(url, address, expected) super() @url = url + @address = address @sent = 0 @confirmed = 0 @expected = expected end def on_start(event) - @acceptor = event.container.listen(@url) +*co event.container.listen(@url) end def on_sendable(event) @@ -51,26 +47,15 @@ class SimpleSend < Qpid::Proton::Handler::MessagingHandler @confirmed = @confirmed + 1 if @confirmed == @expected puts "All #{@expected} messages confirmed!" - event.connection.close + event.container.stop end end end -OptionParser.new do |opts| - opts.banner = "Usage: simple_send.rb [options]" - - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address| - options[:address] = address - end - - opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}", - OptionParser::DecimalInteger) do |messages| - options[:messages] = messages - end -end.parse! - -begin - Qpid::Proton::Reactor::Container.new(SimpleSend.new(options[:address], options[:messages])).run -rescue Interrupt => error - puts "ERROR: #{error}" +unless (2..3).include? ARGV.size + STDERR.puts "Usage: #{__FILE__} URL ADDRESS [COUNT] +Listen on URL and send COUNT messages to ADDRESS" + return 1 end +url, address, count = ARGV +Qpid::Proton::Container.new(DirectSend.new(url, address, count || 10)).run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/example_test.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/example_test.rb b/examples/ruby/example_test.rb index 12ab784..bcb6aa6 100755 --- a/examples/ruby/example_test.rb +++ b/examples/ruby/example_test.rb @@ -1,4 +1,4 @@ -#!/usr/bin/enc ruby +#!/usr/bin/env ruby # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -22,56 +22,36 @@ require 'minitest/autorun' require 'qpid_proton' require 'socket' -class ExampleTest < MiniTest::Test - - def run_script(script, port) - cmd = [RbConfig.ruby, script] - cmd += ["-a", ":#{port}/examples"] if port - return IO.popen(cmd) +# Wait for the broker to be listening +def wait_for(url, timeout = 5) + deadline = Time.now + 5 + begin + TCPSocket.open("", URI(url).port).close + rescue Errno::ECONNREFUSED + retry if Time.now < deadline + raise end +end - def assert_output(script, want, port=nil) - out = run_script(script, port) - assert_equal want, out.read.strip - end +# URL with an unused port +def test_url() + "amqp://:#{TCPServer.open(0) { |s| s.addr[1] }}" +end - def test_helloworld - assert_output("helloworld.rb", "Hello world!", $port) - end - def test_send_recv - assert_output("simple_send.rb", "All 100 messages confirmed!", $port) - want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" } - assert_output("simple_recv.rb", want.strip, $port) - end +class ExampleTest < MiniTest::Test - def test_direct_recv - TestPort.new do |tp| - p = run_script("direct_recv.rb", tp.port) - wait_port tp.port - assert_output("simple_send.rb", "All 100 messages confirmed!", tp.port) - want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" } - assert_equal(want.strip, p.read.strip) - end + def run_script(*args) + return IO.popen([ RbConfig.ruby ] + args.map { |a| a.to_s }) end - def test_direct_send - TestPort.new do |tp| - p = run_script("direct_send.rb", tp.port) - wait_port tp.port - want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" } - assert_output("simple_recv.rb", want.strip, $port) - assert_equal("All 100 messages confirmed!", p.read.strip) - end + def assert_output(want, *args) + out = run_script(*args) + assert_equal(want, out.read.strip) end - def test_direct_send - TestPort.start_wait do |port| - p = run_script("direct_recv.rb", port) - assert_output("simple_send.rb", "All 100 messages confirmed!", port) - want = (0..99).reduce("") { |x,y| x << "Received: sequence #{y}\n" } - assert_equal(want.strip, p.read.strip) - end + def test_helloworld + assert_output("Hello world!", "helloworld.rb", $url, __method__) end def test_client_server @@ -85,28 +65,57 @@ class ExampleTest < MiniTest::Test -> And the mome raths outgrabe. <- AND THE MOME RATHS OUTGRABE. EOS - srv = run_script("server.rb", $port) - assert_output("client.rb", want.strip, $port) - + server = run_script("server.rb", $url, __method__) + assert_output(want.strip, "client.rb", $url, __method__) ensure - Process.kill :TERM, srv.pid if srv + Process.kill :TERM, server.pid if server end -end -# Start the broker before all tests. -$port = TCPServer.open(0) do |s| s.addr[1]; end # find an unused port -$broker = spawn("#{RbConfig.ruby} reactor/broker.rb -a :#{$port}") + def test_send_recv + assert_output("All 10 messages confirmed!", "simple_send.rb", $url, __method__) + want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" } + assert_output(want.strip, "simple_recv.rb", $url, __method__) + end -# Wait for the broker to be listening -deadline = Time.now + 5 -begin - TCPSocket.open("", $port).close -rescue Errno::ECONNREFUSED - retry if Time.now < deadline - raise + def test_helloworld_direct + url = test_url + assert_output("Hello world!", "helloworld_direct.rb", url, __method__) + end + + def test_direct_recv + url = test_url + p = run_script("direct_recv.rb", url, __method__) + wait_for url + assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__) + want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" } + assert_equal(want.strip, p.read.strip) + end + + def test_direct_send + url = test_url + p = run_script("direct_send.rb", url, __method__) + wait_for url + want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" } + assert_output(want.strip, "simple_recv.rb", url, __method__) + assert_equal("All 10 messages confirmed!", p.read.strip) + end + + def test_direct_send + url = test_url + p = run_script("direct_recv.rb", url, __method__) + wait_for url + assert_output("All 10 messages confirmed!", "simple_send.rb", url, __method__) + want = (0..9).reduce("") { |x,y| x << "Received: sequence #{y}\n" } + assert_equal(want.strip, p.read.strip) + end end +# Start the broker before all tests. +$url = test_url +$broker = IO.popen([RbConfig.ruby, 'broker.rb', $url]) +wait_for $url + # Kill the broker after all tests MiniTest.after_run do - Process.kill(:TERM, $broker) if $broker + Process.kill(:TERM, $broker.pid) if $broker end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/helloworld.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/helloworld.rb b/examples/ruby/helloworld.rb index 9b02e8a..2b2d4f5 100644 --- a/examples/ruby/helloworld.rb +++ b/examples/ruby/helloworld.rb @@ -22,16 +22,15 @@ require 'optparse' class HelloWorld < Qpid::Proton::Handler::MessagingHandler - def initialize(server, address) + def initialize(url, address) super() - @server = server - @address = address + @url, @address = url, address end def on_start(event) - conn = event.container.connect(:address => @server) - event.container.create_sender(conn, :target => @address) - event.container.create_receiver(conn, :source => @address) + conn = event.container.connect(@url) + conn.open_sender(@address) + conn.open_receiver(@address) end def on_sendable(event) @@ -51,23 +50,10 @@ class HelloWorld < Qpid::Proton::Handler::MessagingHandler end end -options = { - :address => "localhost:5672", - :queue => "examples" -} - -OptionParser.new do |opts| - opts.banner = "Usage: helloworld_direct.rb [options]" - - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address| - options[:address] = address - end - - opts.on("-q", "--queue=QUEUE", "Send messages to QUEUE (def. #{options[:queue]})") do |queue| - options[:queue] = queue - end - -end.parse! - -hw = HelloWorld.new(options[:address], "examples") -Qpid::Proton::Reactor::Container.new(hw).run +if ARGV.size != 2 + STDERR.puts "Usage: #{__FILE__} URL ADDRESS +Connect to URL, send a message to ADDRESS and receive it back" + return 1 +end +url, address = ARGV +Qpid::Proton::Container.new(HelloWorld.new(url, address)).run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/helloworld_direct.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/helloworld_direct.rb b/examples/ruby/helloworld_direct.rb index e98cc1f..dab368b 100644 --- a/examples/ruby/helloworld_direct.rb +++ b/examples/ruby/helloworld_direct.rb @@ -20,22 +20,19 @@ require 'qpid_proton' require 'optparse' -options = { - :address => "localhost:5672/examples", -} - class HelloWorldDirect < Qpid::Proton::Handler::MessagingHandler include Qpid::Proton::Util::Wrapper - def initialize(url) + def initialize(url, address) super() - @url = url + @url, @address = url, address end def on_start(event) - @acceptor = event.container.listen(@url) - event.container.create_sender(@url) + event.container.listen(@url) + c = event.container.connect(@url) # Connect to self! + c.open_sender(@address) end def on_sendable(event) @@ -50,25 +47,14 @@ class HelloWorldDirect < Qpid::Proton::Handler::MessagingHandler end def on_accepted(event) - event.connection.close - end - - def on_connection_closed(event) - @acceptor.close + event.container.stop end - end -OptionParser.new do |opts| - opts.banner = "Usage: helloworld_direct.rb [options]" - - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address| - options[:address] = address - end - -end.parse! - -begin - Qpid::Proton::Reactor::Container.new(HelloWorldDirect.new(options[:address])).run -rescue Interrupt => error +if ARGV.size != 2 + STDERR.puts "Usage: #{__FILE__} URL ADDRESS +Listen on and connect to URL (connect to self), send a message to ADDRESS and receive it back" + return 1 end +url, address = ARGV +Qpid::Proton::Container.new(HelloWorldDirect.new(url, address)).run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/server.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/server.rb b/examples/ruby/server.rb index 9373272..87eba99 100644 --- a/examples/ruby/server.rb +++ b/examples/ruby/server.rb @@ -22,35 +22,31 @@ require 'optparse' class Server < Qpid::Proton::Handler::MessagingHandler - def initialize(url) + def initialize(url, address) super() - @url = Qpid::Proton::URL.new url - @address = @url.path + @url = url + @address = address @senders = {} end def on_start(event) - @container = event.container - @conn = @container.connect(:url => @url) - @receiver = @container.create_receiver(@conn, :source => @address) + c = event.container.connect(@url) + c.open_receiver(@address) @relay = nil end def on_connection_opened(event) if event.connection.remote_offered_capabilities && - event.connection.remote_offered_capabilities.contain?("ANONYMOUS-RELAY") - @relay = @container.create_sender(@conn, nil) + event.connection.remote_offered_capabilities.contain?("ANONYMOUS-RELAY") + @relay = event.connection.open_sender({:target => nil}) end end def on_message(event) msg = event.message + return unless msg.reply_to # Not a request message puts "<- #{msg.body}" - sender = @relay || @senders[msg.reply_to] - if sender.nil? - sender = @container.create_sender(@conn, :target => msg.reply_to) - @senders[msg.reply_to] = sender - end + sender = @relay || (@senders[msg.reply_to] ||= event.connection.open_sender(msg.reply_to)) reply = Qpid::Proton::Message.new reply.address = msg.reply_to reply.body = msg.body.upcase @@ -64,13 +60,10 @@ class Server < Qpid::Proton::Handler::MessagingHandler end end -options = { - :address => "localhost:5672/examples", -} - -OptionParser.new do |opts| - opts.banner = "Usage: server.rb [options]" - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") { |address| options[:address] = address } -end.parse! - -Qpid::Proton::Reactor::Container.new(Server.new(options[:address])).run() +if ARGV.size != 2 + STDERR.puts "Usage: #{__FILE__} URL ADDRESS +Server listening on URL, reply to messages to ADDRESS" + return 1 +end +url, address = ARGV +Qpid::Proton::Container.new(Server.new(url, address)).run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/simple_recv.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/simple_recv.rb b/examples/ruby/simple_recv.rb index 47b21ed..d1a9607 100644 --- a/examples/ruby/simple_recv.rb +++ b/examples/ruby/simple_recv.rb @@ -20,17 +20,19 @@ require 'qpid_proton' require 'optparse' -class Receiver < Qpid::Proton::Handler::MessagingHandler +class SimpleReceive < Qpid::Proton::Handler::MessagingHandler - def initialize(url, count) + def initialize(url, address, count) super() @url = url + @address = address @expected = count @received = 0 end def on_start(event) - event.container.create_receiver(@url) + c = event.container.connect(@url) + c.open_receiver(@address) end def on_message(event) @@ -42,28 +44,14 @@ class Receiver < Qpid::Proton::Handler::MessagingHandler end end end - end -options = { - :address => "localhost:5672/examples", - :messages => 100, -} - -OptionParser.new do |opts| - opts.banner = "Usage: simple_send.rb [options]" - - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address| - options[:address] = address - end +unless (2..3).include? ARGV.size + STDERR.puts "Usage: #{__FILE__} URL ADDRESS [COUNT]} +Connect to URL and receive COUNT messages from ADDRESS" + return 1 +end +url, address, count = ARGV - opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}", - OptionParser::DecimalInteger) do |messages| - options[:messages] = messages - end -end.parse! +Qpid::Proton::Container.new(SimpleReceive.new(url, address, count || 10)).run -begin - Qpid::Proton::Reactor::Container.new(Receiver.new(options[:address], options[:messages])).run -rescue Interrupt -end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/examples/ruby/simple_send.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/simple_send.rb b/examples/ruby/simple_send.rb index 38857f3..25fc56c 100644 --- a/examples/ruby/simple_send.rb +++ b/examples/ruby/simple_send.rb @@ -20,23 +20,20 @@ require 'qpid_proton' require 'optparse' -options = { - :address => "localhost:5672/examples", - :messages => 100, -} - class SimpleSend < Qpid::Proton::Handler::MessagingHandler - def initialize(url, expected) + def initialize(url, address, expected) super() @url = url + @address = address @sent = 0 @confirmed = 0 @expected = expected end def on_start(event) - event.container.create_sender(@url) + c = event.container.connect(@url) + c.open_sender(@address) end def on_sendable(event) @@ -54,20 +51,12 @@ class SimpleSend < Qpid::Proton::Handler::MessagingHandler event.connection.close end end - end -OptionParser.new do |opts| - opts.banner = "Usage: simple_send.rb [options]" - - opts.on("-a", "--address=ADDRESS", "Send messages to ADDRESS (def. #{options[:address]}).") do |address| - options[:address] = address - end - - opts.on("-m", "--messages=COUNT", "The number of messages to send (def. #{options[:messages]}", - OptionParser::DecimalInteger) do |messages| - options[:messages] = messages - end -end.parse! - -Qpid::Proton::Reactor::Container.new(SimpleSend.new(options[:address], options[:messages])).run +unless (2..3).include? ARGV.size + STDERR.puts "Usage: #{__FILE__} URL ADDRESS [COUNT]} +Connect to URL and send COUNT messages to ADDRESS" + return 1 +end +url, address, count = ARGV +Qpid::Proton::Container.new(SimpleSend.new(url, address, count || 10)).run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt index 46b7ba8..c00fd3d 100644 --- a/proton-c/bindings/ruby/CMakeLists.txt +++ b/proton-c/bindings/ruby/CMakeLists.txt @@ -45,7 +45,7 @@ set_target_properties(cproton-ruby ## Make a gem -file(GLOB_RECURSE RUBY_SRC RELATIVE . *.rb *.rdoc) +file(GLOB_RECURSE RUBY_SRC RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} *.rb *.rdoc) find_program(GEM_EXE gem DOC "Program to build and install ruby gem packages") mark_as_advanced(GEM_EXE) @@ -133,7 +133,7 @@ if (YARD_EXE) add_custom_command( OUTPUT ${bin}/doc WORKING_DIRECTORY ${src} - COMMAND ${YARD_EXE} -q -o ${bin}/doc -b ${bin}/.yardoc --no-private -r README.rdoc + COMMAND ${YARD_EXE} -q -o ${bin}/doc -b ${bin}/.yardoc --no-progress --no-private -r README.rdoc DEPENDS ${RUBY_SRC} ) add_custom_target(docs-ruby DEPENDS ${bin}/doc) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/connection.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb index 6caf589..0c878d4 100644 --- a/proton-c/bindings/ruby/lib/core/connection.rb +++ b/proton-c/bindings/ruby/lib/core/connection.rb @@ -23,21 +23,17 @@ module Qpid::Proton class Connection < Endpoint protected - PROTON_METHOD_PREFIX = "pn_connection" include Util::SwigHelper + PROTON_METHOD_PREFIX = "pn_connection" public # @!attribute hostname - # - # @return [String] The AMQP hostname for the connection. - # + # @return [String] The AMQP hostname for the connection. proton_accessor :hostname # @!attribute user - # The user name for authentication. - # - # @return [String] the user name + # @return [String] User name used for authentication (outgoing connection) or the authenticated user name (incoming connection) proton_accessor :user # @private @@ -45,6 +41,7 @@ module Qpid::Proton # @private attr_accessor :overrides + # @private attr_accessor :session_policy # @private @@ -68,12 +65,11 @@ module Qpid::Proton def initialize(impl = Cproton.pn_connection) super() @impl = impl - @offered_capabilities = nil - @desired_capabilities = nil - @properties = nil @overrides = nil @collector = nil @session_policy = nil + @link_count = 0 + @link_prefix = "" self.class.store_instance(self, :pn_connection_attachments) end @@ -195,18 +191,34 @@ module Qpid::Proton # Open the local end of the connection. # - # @option options [String] :container_id Unique AMQP container ID, defaults to a UUID - # @option options [String] :link_prefix Prefix for generated link names, default is container_id - # - def open(options={}) - object_to_data(@offered_capabilities, Cproton.pn_connection_offered_capabilities(@impl)) - object_to_data(@desired_capabilities, Cproton.pn_connection_desired_capabilities(@impl)) - object_to_data(@properties, Cproton.pn_connection_properties(@impl)) - cid = options[:container_id] || SecureRandom.uuid - Cproton.pn_connection_set_container(@impl, cid) - @link_prefix = options[:link_prefix] || cid - @link_prefix = SecureRandom.uuid if !@link_prefix || @link_prefix.empty? - @link_count = 0 + # @option opts [MessagingHandler] :handler handler for events related to this connection. + # @option opts [String] :user user-name for authentication. + # @option opts [String] :password password for authentication. + # @option opts [Numeric] :idle_timeout seconds before closing an idle connection + # @option opts [Boolean] :sasl_enabled Enable or disable SASL. + # @option opts [Boolean] :sasl_allow_insecure_mechs Allow mechanisms that disclose clear text + # passwords, even over an insecure connection. + # @option opts [String] :sasl_allowed_mechs the allowed SASL mechanisms for use on the connection. + # @option opts [String] :container_id AMQP container ID, normally provided by {Container} + # + def open(opts={}) + return if local_active? + apply opts + Cproton.pn_connection_open(@impl) + end + + # @private + def apply opts + # NOTE: Only connection options are set here. Transport options are set + # with {Transport#apply} from the connection_driver (or in + # on_connection_bound if not using a connection_driver) + Cproton.pn_connection_set_container(@impl, opts[:container_id] || SecureRandom.uuid) + Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user] + Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password] + @link_prefix = opts[:link_prefix] || container_id + object_to_data(opts[:offered_capabilities], Cproton.pn_connection_offered_capabilities(@impl)) + object_to_data(opts[:desired_capabilities], Cproton.pn_connection_desired_capabilities(@impl)) + object_to_data(opts[:properties], Cproton.pn_connection_properties(@impl)) Cproton.pn_connection_open(@impl) end @@ -220,8 +232,11 @@ module Qpid::Proton # Once this operation has completed, the #LOCAL_CLOSED state flag will be # set. # - def close - self._update_condition + def close(error = nil) + if error + @condition = Condition.make error + self._update_condition + end Cproton.pn_connection_close(@impl) end @@ -257,10 +272,10 @@ module Qpid::Proton end # Open a sender on the default_session - def open_sender(*args, &block) default_session.open_sender(*args, &block) end + def open_sender(opts = {}) default_session.open_sender(opts) end # Open a on the default_session - def open_receiver(*args, &block) default_session.open_receiver(*args, &block) end + def open_receiver(opts = {}) default_session.open_receiver(opts) end # Returns the first session from the connection that matches the specified # state mask. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/connection_driver.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb index b5b38ac..6c85659 100644 --- a/proton-c/bindings/ruby/lib/core/connection_driver.rb +++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb @@ -36,23 +36,30 @@ module Qpid # # @param io [#read_nonblock, #write_nonblock] An {IO} or {IO}-like object that responds # to #read_nonblock and #write_nonblock. - # @param handler [MessagingHandler] The handler to be invoked for AMQP events - # - def initialize io, handler=nil + # @param opts [Hash] See {Connection#open} - transport options are set here, + # remaining options + # @pram server [Bool] If true create a server (incoming) connection + def initialize(io, opts = {}, server=false) @impl = Cproton.pni_connection_driver or raise RuntimeError, "cannot create connection driver" @io = io - @handler = handler || Handler::MessagingHandler.new # Default handler for default behaviour - @rbuf = "" # String to re-use as read buffer + @handler = opts[:handler] || Handler::MessagingHandler.new # Default handler if missing + @rbuf = "" # String to re-use as read buffer + connection.apply opts + transport.set_server if server + transport.apply opts end - # @return [MessagingHandler] attr_reader :handler # @return [Connection] - def connection() Connection.wrap(Cproton.pni_connection_driver_connection(@impl)); end + def connection() + @connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl)) + end # @return [Transport] - def transport() Transport.wrap(Cproton.pni_connection_driver_transport(@impl)); end + def transport() + @transport ||= Transport.wrap(Cproton.pni_connection_driver_transport(@impl)) + end # @return [IO] Allows ConnectionDriver to be passed directly to {IO#select} def to_io() @io; end @@ -63,21 +70,15 @@ module Qpid # @return [Bool] True if the driver has data to write def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end - # True if read and write sides of the IO are closed. Note this does not imply - # {#finished?} since there may still be events to dispatch. - def closed? - Cproton.pn_connection_driver_read_closed(@impl) && - Cproton.pn_connection_driver_read_closed(@impl) - end - - # True if the ConnectionDriver has nothing left to do: {#closed?} and - # there are no more events to dispatch. + # True if the ConnectionDriver has nothing left to do: both sides of the + # transport are closed and there are no events to dispatch. def finished?() Cproton.pn_connection_driver_finished(@impl); end # Dispatch available events, call the relevant on_* methods on the {#handler}. def dispatch(extra_handlers = nil) extra_handlers ||= [] while event = Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl)) + pre_dispatch(event) event.dispatch(@handler) extra_handlers.each { |h| event.dispatch h } end @@ -90,13 +91,12 @@ module Qpid return if size <= 0 @io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty? - rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR - # Try again later. - rescue EOFError # EOF is not an error - Cproton.pn_connection_driver_read_close(@impl) - rescue IOError => e # IOError is passed to the transport - error "read: #{e}" - Cproton.pn_connection_driver_read_close(@impl) + rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR + # Try again later. + rescue EOFError # EOF is not an error + close_read + rescue IOError, SystemCallError => e # is passed to the transport + close e end # Write to IO without blocking. @@ -106,9 +106,8 @@ module Qpid Cproton.pn_connection_driver_write_done(@impl, n) if n > 0 rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR # Try again later. - rescue IOError => e - error "write: #{e}" - Cproton.pn_connection_driver_write_close(@impl) + rescue IOError, SystemCallError => e + close e end # Generate timed events and IO, for example idle-timeout and heart-beat events. @@ -142,41 +141,37 @@ module Qpid return next_tick end - # Close the read side of the IO with optional error. - # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch} - def close_read(e=nil) - @io.close_read - error(e) - Cproton.pn_connection_driver_read_close(@impl) + # Close the read side of the transport + def close_read + return if Cproton.pn_connection_driver_read_closed(@impl) + Cproton.pn_connection_driver_read_close(@impl) + @io.close_read end - # Close the write side of the IO with optional error - # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch} - def close_write(e=nil) - @io.close_write - error(e) - Cproton.pn_connection_driver_write_close(@impl) + # Close the write side of the transport + def close_write + return if Cproton.pn_connection_driver_write_closed(@impl) + Cproton.pn_connection_driver_write_close(@impl) + @io.close_write end # Close both sides of the IO with optional error - # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch} - def close(e=nil) - if !closed? - close_read(e) - close_write(e) + # @param error [Condition] If non-nil pass to {#handler}.on_transport_error on next {#dispatch} + # Note `error` can be any value accepted by [Condition##make] + def close(error=nil) + if error + cond = Condition.make(error, "proton:io") + Cproton.pn_connection_driver_errorf(@impl, cond.name, "%s", cond.description) end + close_read + close_write end - def to_s - transport = Cproton.pni_connection_driver_tranport(@impl) - return "#<#{self.class.name}[#{transport}]:#{@io}>" - end + protected - private + # Override in subclass to add event context + def pre_dispatch(event) event; end - def error(e) - Cproton.pn_connection_driver_errorf(@impl, "proton:io", "%s", e.to_s) if e - end end end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb new file mode 100644 index 0000000..29df51b --- /dev/null +++ b/proton-c/bindings/ruby/lib/core/container.rb @@ -0,0 +1,249 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +require 'thread' +require 'set' +require_relative 'listener' + +module Qpid::Proton + + # An AMQP container manages a set of {Connection}s which contain {#Sender} and + # {#Receiver} links to transfer messages. + # + # TODO aconway 2017-10-26: documentthreading/dispatch role + # + # Usually, each AMQP client or server process has a single container for + # all of its connections and links. + class Container + private + + def amqp_uri(s) Qpid::Proton::amqp_uri s; end + + class ConnectionDriver < Qpid::Proton::ConnectionDriver + def initialize container, io, opts, server=false + super io, opts, server + @container = container + end + + def final() end + def pre_dispatch(event) event.container = @container; end + end + + public + + # Create a new Container + # + # @param opts [Hash] Options + # @option opts [String] :id A unique ID for this container. Defaults to a random UUID. + # @option opts [MessagingHandler] :handler Default handler for connections + # that do not have their own handler (see {#connect} and {#listen}) + # + # @note In a multi-threaded, multi-connection container the default + # handler can be called concurrently for different connections. An + # individual handler attached to a single connection is never called + # concurrently, so that is the recommended approach for multi-threading. + def initialize(opts = {}) + opts = { :handler => opts } unless opts.is_a? Hash # Allow handler as only parameter + opts = { :handler => opts } if opts.is_a? String # Allow ID as only parameter option + @handler = opts[:handler] + @id = String.new(opts[:id] || SecureRandom.uuid).freeze + @work = Queue.new + @work.push self # Let the first #run thread select + @wake = IO.pipe + @dummy = "" # Dummy buffer for draining wake pipe + @lock = Mutex.new + @selectables = Set.new # ConnectionDrivers and Listeners + @auto_stop = true + @active = 0 # activity (connection, listener) counter for auto_stop + @running = 0 # concurrent calls to #run + end + + # @return [String] Unique identifier for this container + attr_reader :id + + # Open an AMQP connection. + # + # @param url [String, URI] Open a {TCPSocket} to url.host, url.port. + # url.scheme must be "amqp" or "amqps", url.scheme.nil? is treated as "amqp" + # url.user, url.password are used as defaults if opts[:user], opts[:password] are nil + # @option (see Connection#open) + # @return [Connection] The new AMQP connection + # + def connect(url, opts = {}) + url = amqp_uri(url) + opts[:user] ||= url.user + opts[:password] ||= url.password + # TODO aconway 2017-10-26: Use SSL for amqps URLs + connect_io(TCPSocket.new(url.host, url.port), opts) + end + + # Open an AMQP protocol connection on an existing {IO} object + # @param io [IO] An existing {IO} object, e.g. a {TCPSocket} + # @option (see Connection#open) + def connect_io(io, opts = {}) + cd = connection_driver(io, opts) + cd.connection.open() + add(cd).connection + end + + # Listen for incoming AMQP connections + # + # @param url [String,URI] Listen on host:port of the AMQP URL + # @param handler [ListenHandler] A {ListenHandler} object that will be called + # with events for this listener and can generate a new set of options for each one. + # @return [Listener] The AMQP listener. + def listen(url, handler=ListenHandler.new) + url = amqp_uri(url) + # TODO aconway 2017-11-01: amqps + listen_io(TCPServer.new(url.host, url.port), handler) + end + + # Listen for incoming AMQP connections on an existing server socket. + # @param io A server socket, for example a {TCPServer} + # @param handler [ListenHandler] Handler for events from this listener + def listen_io(io, handler=ListenHandler.new) + add(Listener.new(io, handler)) + end + + # Run the container: wait for IO activity, dispatch events to handlers. + # + # More than one thread can call {#run} concurrently, the container will use + # all the {#run} ,threads as a pool to handle multiple connections + # concurrently. The container ensures that handler methods for a single + # connection (or listener) instance are serialized, even if the container + # has multiple threads. + # + def run() + @lock.synchronize { @running += 1 } + + unless @on_start + @on_start = true + # TODO aconway 2017-10-28: proper synthesized event for on_start + event = Class.new do + def initialize(c) @container = c; end + attr_reader :container + end.new(self) + @handler.on_start(event) if @handler && @handler.respond_to?(:on_start) + end + + while x = @work.pop + case x + when Container then + # Only one thread can select at a time + r, w = [@wake[0]], [] + @lock.synchronize do + @selectables.each do |s| + r << s if s.send :can_read? + w << s if s.send :can_write? + end + end + r, w = IO.select(r, w) + selected = Set.new(r).merge(w) + if selected.delete?(@wake[0]) # Drain the wake pipe + begin + @wake[0].read_nonblock(256, @dummy) while true + rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR + end + end + @lock.synchronize do + if @stop_all + selected = @selectables + @selectables = Set.new + selected.each { |s| s.close } + @work << nil if selected.empty? # Already idle, initiate stop now + @stop_all = false + else + @selectables.subtract(selected) + end + end + # Move selected items to the work queue for serialized processing. + @lock.synchronize { @selectables.subtract(selected) } + selected.each { |s| @work << s } # Queue up all the work + @work << self # Allow another thread to select() + when ConnectionDriver then + x.process + rearm x + when Listener then + io, opts = x.send :process + add(connection_driver(io, opts, true)) if io + rearm x + end + # TODO aconway 2017-10-26: scheduled tasks + end + ensure + @running -= 1 + if @running > 0 # Signal the next #run thread that we are stopping + @work << nil + wake + end + end + + # @!attribute auto_stop [rw] + # @return [Bool] With auto_stop enabled, all calls to {#run} will return when the + # container's last activity (connection, listener or scheduled event) is + # closed/completed. With auto_stop disabled {#run} does not return. + def auto_stop=(enabled) @lock.synchronize { @auto_stop=enabled }; wake; end + def auto_stop() @lock.synchronize { @auto_stop }; end + + # Enable {#auto_stop} and close all connections and listeners with error. + # {#stop} returns immediately, calls to {#run} will return when all activity is finished. + # @param error [Condition] If non-nil pass to {#handler}.on_error + # Note `error` can be any value accepted by [Condition##make] + def stop(error=nil) + @lock.synchronize do + @auto_stop = true + @stop_all = true + wake + end + end + + private + + # Always wake when we add new work + def work(s) work << s; wake; end + + def wake() + @wake[1].write_nonblock('x') rescue nil + end + + def connection_driver(io, opts, server=false) + opts[:container_id] ||= @id + opts[:handler] ||= @handler + ConnectionDriver.new(self, io, opts, server) + end + + def add(s) + @lock.synchronize do + @active += 1 + end + @work << s + wake + return s + end + + def rearm s + if s.send :finished? + @lock.synchronize { @work << nil if (@active -= 1).zero? && @auto_stop } + else + @lock.synchronize { @selectables << s } + end + wake + end + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/endpoint.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb index 7c6f0a3..04551eb 100644 --- a/proton-c/bindings/ruby/lib/core/endpoint.rb +++ b/proton-c/bindings/ruby/lib/core/endpoint.rb @@ -69,6 +69,9 @@ module Qpid::Proton object_to_condition(@condition, self._local_condition) end + def condition + condition_to_object(_local_condition) || remote_condition; end + # @private def remote_condition condition_to_object(self._remote_condition) @@ -82,6 +85,10 @@ module Qpid::Proton self.connection.transport end + # @private + # @return [Bool] true if {#state} has all the bits of `mask` set + def check_state(mask) (self.state & mask) == mask; end + # @return [Bool] true if endpoint has sent and received a CLOSE frame def closed?() check_state(LOCAL_CLOSED | REMOTE_CLOSED); end @@ -137,10 +144,6 @@ module Qpid::Proton Cproton.pn_decref(impl) end - private - - def check_state(mask) (self.state & mask) == mask; end - end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/listener.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/listener.rb b/proton-c/bindings/ruby/lib/core/listener.rb new file mode 100644 index 0000000..e5f5c0d --- /dev/null +++ b/proton-c/bindings/ruby/lib/core/listener.rb @@ -0,0 +1,110 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +module Qpid::Proton + # A listener for incoming connections. + # + # Create with {Container#listen} or {Container#listen_with} + class Listener + # The listener's container + attr_reader :container + + # Close the listener + # @param error [Condition] Optional error condition. + def close(error=nil) + @closed ||= Condition.make(error) || true + @io.close_read rescue nil # Cause listener to wake out of IO.select + end + + # Get the {IO} server socket used by the listener + def to_io() @io; end + + private # Called by {Container} + + def initialize(io, handler) + @io, @handler = io, handler + end + + def process + unless @closed + unless @open_dispatched + dispatch(:on_open) + @open_dispatched = true + end + begin + return @io.accept, dispatch(:on_accept) + rescue IO::WaitReadable, Errno::EINTR + rescue IOError, SystemCallError => e + close e + end + end + if @closed + dispatch(:on_error, @closed) if @closed != true + dispatch(:on_close) + close @io unless @io.closed? rescue nil + end + end + + def can_read?() true; end + def can_write?() false; end + def finished?() @closed; end + + # TODO aconway 2017-11-06: logging strategy + TRUE = Set[:true, :"1", :yes, :on] + def log?() + enabled = ENV['PN_TRACE_EVT'] + TRUE.include? enabled.downcase.to_sym if enabled + end + + def dispatch(method, *args) + STDERR.puts "(Listener 0x#{object_id.to_s(16)})[#{method}]" if log? + @handler.send(method, self, *args) if @handler && @handler.respond_to?(method) + end + end + + + # Class that handles listener events and provides options for accepted + # connections. This class simply returns a fixed set of options for every + # connection accepted, but you can subclass and override all of the on_ + # methods to provide more interesting behaviour. + class ListenHandler + # @param opts [Hash] Options to return from on_accept. + def initialize(opts={}) @opts = opts; end + + # Called when the listener is ready to accept connections. + # @param listener [Listener] The listener + def on_open(listener) end + + # Called if an error occurs. + # If there is an error while opening the listener, this method is + # called and {#on_open} is not + # @param listener [Listener] + # @param what [Condition] Information about the error. + def on_error(listener, what) end + + # Called when a listener accepts a new connection. + # @param listener [Listener] The listener + # @return [Hash] Options to apply to the incoming connection, see {#connect} + def on_accept(listener) @opts; end + + # Called when the listener closes. + # @param listener [Listener] The listener accepting the connection. + def on_close(listener) end + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/session.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/session.rb b/proton-c/bindings/ruby/lib/core/session.rb index 81135eb..3120d3f 100644 --- a/proton-c/bindings/ruby/lib/core/session.rb +++ b/proton-c/bindings/ruby/lib/core/session.rb @@ -126,7 +126,7 @@ module Qpid::Proton # @deprecated use {#open_receiver} def receiver(name) Receiver.new(Cproton.pn_receiver(@impl, name)); end - # TODO aconway 2016-01-04: doc options or target param + # TODO aconway 2016-01-04: doc options or target param, move option handling to Link. def open_receiver(options = {}) options = { :source => options } if options.is_a? String receiver = Receiver.new Cproton.pn_receiver(@impl, options[:name] || connection.link_name) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/transport.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/transport.rb b/proton-c/bindings/ruby/lib/core/transport.rb index 61767fd..8804c8a 100644 --- a/proton-c/bindings/ruby/lib/core/transport.rb +++ b/proton-c/bindings/ruby/lib/core/transport.rb @@ -77,11 +77,6 @@ module Qpid::Proton TRACE_DRV = Cproton::PN_TRACE_DRV # @private - CLIENT = 1 - # @private - SERVER = 2 - - # @private include Util::SwigHelper # @private @@ -213,26 +208,19 @@ module Qpid::Proton def self.wrap(impl) return nil if impl.nil? - self.fetch_instance(impl, :pn_transport_attachments) || Transport.new(nil, impl) + self.fetch_instance(impl, :pn_transport_attachments) || Transport.new(impl) end # Creates a new transport instance. - # - # @param mode [Integer] The transport mode, either CLIENT or SERVER - # @param impl [pn_transport_t] Should not be used. - # - # @raise [TransportError] If the mode is invalid. - # - def initialize(mode = nil, impl = Cproton.pn_transport) + def initialize(impl = Cproton.pn_transport) @impl = impl - if mode == SERVER - Cproton.pn_transport_set_server(@impl) - elsif (!mode.nil? && mode != CLIENT) - raise TransportError.new("cannot create transport for mode: #{mode}") - end self.class.store_instance(self, :pn_transport_attachments) end + # Set server mode for this tranport - enables protocol detection + # and server-side authentication for incoming connections + def set_server() Cproton.pn_transport_set_server(@impl); end + # Returns whether the transport has any buffered data. # # @return [Boolean] True if the transport has no buffered data. @@ -418,6 +406,12 @@ module Qpid::Proton !@ssl.nil? end + # @private + def apply opts + if opts[:sasl_enabled] != false # SASL is not disabled. + sasl.allow_insecure_mechs = opts[:sasl_allow_insecure_mechs] if opts[:sasl_allow_insecure_mechs] + sasl.allowed_mechs = opts[:sasl_allowed_mechs] if opts[:sasl_allowed_mechs] + end + end end - end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/uri.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/uri.rb b/proton-c/bindings/ruby/lib/core/uri.rb new file mode 100644 index 0000000..b29a719 --- /dev/null +++ b/proton-c/bindings/ruby/lib/core/uri.rb @@ -0,0 +1,50 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +require 'uri' + +module URI + # AMQP URI scheme for the AMQP protocol + class AMQP < Generic + DEFAULT_PORT = 5672 + end + @@schemes['AMQP'] = AMQP + + # AMQPS URI scheme for the AMQP protocol over TLS + class AMQPS < AMQP + DEFAULT_PORT = 5671 + end + @@schemes['AMQPS'] = AMQPS +end + +module Qpid::Proton + # Convert s to an {URI::AMQP} or {URI::AMQPS} + # @param s [String,URI] If s has no scheme, use the {URI::AMQP} scheme + # @return [URI::AMQP] + # @raise [BadURIError] If s has a scheme that is not "amqp" or "amqps" + def self.amqp_uri(s) + u = URI(s) + u.host ||= "" # Behaves badly with nil host + return u if u.is_a? URI::AMQP + raise URI::BadURIError, "Not an AMQP URI: '#{u}'" if u.scheme + u.scheme = "amqp" unless u.scheme + u = URI::parse(u.to_s) # Re-parse with amqp scheme + return u + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/core/url.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/url.rb b/proton-c/bindings/ruby/lib/core/url.rb index 30d2d87..b54df66 100644 --- a/proton-c/bindings/ruby/lib/core/url.rb +++ b/proton-c/bindings/ruby/lib/core/url.rb @@ -23,6 +23,7 @@ module Qpid::Proton attr_reader :scheme attr_reader :username + alias :user :username attr_reader :password attr_reader :host attr_reader :port @@ -60,10 +61,14 @@ module Qpid::Proton Cproton.pn_url_get_port(@url).to_i end + # @return [String] Convert to string def to_s "#{@scheme}://#{@username.nil? ? '' : @username}#{@password.nil? ? '' : '@' + @password + ':'}#{@host}:#{@port}/#{@path}" end + # @return [String] Allow implicit conversion by {String#try_convert} + alias :to_str :to_s + private def defaults http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/event/event.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/event/event.rb b/proton-c/bindings/ruby/lib/event/event.rb index e839f63..67d5e92 100644 --- a/proton-c/bindings/ruby/lib/event/event.rb +++ b/proton-c/bindings/ruby/lib/event/event.rb @@ -210,7 +210,7 @@ module Qpid::Proton Cproton.pn_handler_dispatch(handler.impl, @impl, type.number) else result = Qpid::Proton::Event.dispatch(handler, type.method, self) - if (result != "DELEGATED") && handler.respond_to?(:handlers) + if (result != "DELEGATED") && handler.respond_to?(:handlers) && handler.handlers handler.handlers.each do |hndlr| self.dispatch(hndlr) end @@ -228,10 +228,11 @@ module Qpid::Proton end def container - impl = Cproton.pn_event_reactor(@impl) - Qpid::Proton::Util::ClassWrapper::WRAPPERS["pn_reactor"].call(impl) + @container || Util::ClassWrapper::WRAPPERS["pn_reactor"].call(Cproton.pn_event_reactor(@impl)) end + def container=(c); @container = c; end + # Returns the transport for this event. # # @return [Transport, nil] The transport. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb b/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb index 11e970a..8a1a16e 100644 --- a/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb +++ b/proton-c/bindings/ruby/lib/handler/endpoint_state_handler.rb @@ -84,11 +84,11 @@ module Qpid::Proton::Handler end def on_connection_remote_open(event) - if !(event.connection.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero? + if event.connection.local_active? self.on_connection_opened(event) elsif event.connection.local_uninit? self.on_connection_opening(event) - event.connection.open + event.connection.open unless event.connection.local_active? end end @@ -110,7 +110,7 @@ module Qpid::Proton::Handler end def on_link_remote_open(event) - if !(event.link.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero? + if event.link.local_active? self.on_link_opened(event) elsif event.link.local_uninit? self.on_link_opening(event) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/qpid_proton.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb index dae9b5f..43b8071 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton.rb @@ -27,6 +27,9 @@ else require "securerandom" end +DEPRECATION = "[DEPRECATION]" +def deprecated(old, new) warn "#{DEPRECATION} #{old} is deprecated, use #{new}"; end + # Exception classes require "core/exceptions" @@ -61,6 +64,7 @@ require "event/collector" # Main Proton classes require "core/selectable" +require "core/uri" require "core/message" require "core/endpoint" require "core/session" @@ -109,6 +113,10 @@ require "reactor/session_per_connection" require "reactor/container" require "reactor/link_option" +# Core classes that depend on handlers and events +require "core/container" +require "core/connection_driver" + module Qpid::Proton # @private def self.registry http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/reactor/reactor.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/reactor/reactor.rb b/proton-c/bindings/ruby/lib/reactor/reactor.rb index a84a716..88d062e 100644 --- a/proton-c/bindings/ruby/lib/reactor/reactor.rb +++ b/proton-c/bindings/ruby/lib/reactor/reactor.rb @@ -51,6 +51,7 @@ module Qpid::Proton::Reactor end def initialize(handlers, options = {}) + deprecated(self.class, "Qpid::Proton::Container") @impl = options[:impl] if @impl.nil? @impl = Cproton.pn_reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/util/condition.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/util/condition.rb b/proton-c/bindings/ruby/lib/util/condition.rb index ad49595..37c41a0 100644 --- a/proton-c/bindings/ruby/lib/util/condition.rb +++ b/proton-c/bindings/ruby/lib/util/condition.rb @@ -17,7 +17,7 @@ # under the License. #++ -module Qpid::Proton::Util +module Qpid::Proton class Condition @@ -29,19 +29,40 @@ module Qpid::Proton::Util @info = info end - # @private - def to_s - "Condition(#{@name}, #{@description}, #{@info})" - end + def to_s() "#{@name}: #{@description + def to_s() "#{@name}: #{@description}"; end +}"; end + + def inspect() "#{self.class.name}(#{@name.inspect}, #{@description.inspect}, #{@info.inspect})"; end - # @private def ==(other) - ((other.class = self.class) && + ((other.is_a? Condition) && (other.name == self.name) && (other.description == self.description) && (other.info == self.info)) end + # Make a condition. + # @param obj the object to turn into a condition + # @param default_name condition name to use if obj does not imply a name + # @return + # - when Condition return obj unchanged + # - when Exception return Condition(obj.class.name, obj.to_s) + # - when nil then nil + # - else return Condition(default_name, obj.to_s) + # If objey + def self.make(obj, default_name="proton") + case obj + when Condition then obj + when Exception then Condition.new(obj.class.name, obj.to_s) + when nil then nil + else Condition.new(default_name, obj.to_s) + end + end + end + module Util #TODO aconway 2017-10-28: backwards compat + Condition = Qpid::Proton::Condition + end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/util/engine.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/util/engine.rb b/proton-c/bindings/ruby/lib/util/engine.rb index e34faaa..fa5c038 100644 --- a/proton-c/bindings/ruby/lib/util/engine.rb +++ b/proton-c/bindings/ruby/lib/util/engine.rb @@ -70,8 +70,8 @@ module Qpid::Proton::Util unless object.nil? Cproton.pn_condition_set_name(condition, object.name) Cproton.pn_condition_set_description(condition, object.description) - info = Data.new(Cproton.pn_condition_info(condition)) - if object.info? + if !object.info.nil? + info = Data.new(Cproton.pn_condition_info(condition)) info.object = object.info end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/lib/util/uri.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/util/uri.rb b/proton-c/bindings/ruby/lib/util/uri.rb deleted file mode 100644 index 0820746..0000000 --- a/proton-c/bindings/ruby/lib/util/uri.rb +++ /dev/null @@ -1,27 +0,0 @@ -#-- -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -#++ - -require 'uri' - -module URI - class AMQP < Generic; DEFAULT_PORT = 5672; end - @@schemes['AMQP'] = AMQP - class AMQPS < Generic; DEFAULT_PORT = 5671; end - @@schemes['AMQPS'] = AMQPS -end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/tests/test_connection_driver.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb index 2ddc8ef..174e86d 100644 --- a/proton-c/bindings/ruby/tests/test_connection_driver.rb +++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb @@ -39,11 +39,11 @@ class ConnectionDriverTest < Minitest::Test def on_message(event) @message = event.message; event.connection.close; end end - sender = ConnectionDriver.new(@sockets[0], send_class.new) + sender = ConnectionDriver.new(@sockets[0], {:handler => send_class.new}) sender.connection.open(); sender.connection.open_sender() - receiver = ConnectionDriver.new(@sockets[1], recv_class.new) + receiver = ConnectionDriver.new(@sockets[1], {:handler => recv_class.new}) drivers = [sender, receiver] until drivers.all? { |d| d.finished? } rd = drivers.select {|d| d.can_read? } @@ -60,7 +60,7 @@ class ConnectionDriverTest < Minitest::Test idle_class = Class.new(MessagingHandler) do def on_connection_bound(event) event.transport.idle_timeout = 10; end end - drivers = [ConnectionDriver.new(@sockets[0], idle_class.new), ConnectionDriver.new(@sockets[1])] + drivers = [ConnectionDriver.new(@sockets[0], {:handler => idle_class.new}), ConnectionDriver.new(@sockets[1])] drivers[0].connection.open() now = Time.now drivers.each { |d| d.process(true, true, now) } until drivers[0].connection.open? http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51cda9d5/proton-c/bindings/ruby/tests/test_container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb index 9c1d46c..41fa133 100644 --- a/proton-c/bindings/ruby/tests/test_container.rb +++ b/proton-c/bindings/ruby/tests/test_container.rb @@ -19,63 +19,137 @@ require 'test_tools' require 'minitest/unit' +require 'socket' Message = Qpid::Proton::Message SASL = Qpid::Proton::SASL -URL = Qpid::Proton::URL +Disposition = Qpid::Proton::Disposition -class ContainerTest < Minitest::Test +# Container that listens on a random port +class TestContainer < Container - def test_simple() + def initialize(opts = {}, lopts = {}) + super opts + @server = TCPServer.open(0) + @listener = listen_io(@server, ListenOnceHandler.new(lopts)) + end - hc = Class.new(TestServer) do - attr_reader :accepted + def port() @server.addr[1]; end + def url() "amqp://:#{port}"; end +end - def on_start(event) - super - event.container.create_sender("amqp://#{addr}", {:name => "testlink"}) +class ContainerTest < Minitest::Test + + def test_simple() + sh = Class.new(MessagingHandler) do + attr_reader :accepted, :sent + def on_sendable(e) + e.link.send Message.new("foo") unless @sent + @sent = true end - def on_sendable(event) - if @sent.nil? && event.sender.credit > 0 - event.sender.send(Message.new("testmessage")) - @sent = true - end + def on_accepted(e) + @accepted = true + e.connection.close + end + end.new + + rh = Class.new(MessagingHandler) do + attr_reader :message, :link + def on_link_opening(e) + @link = e.link + e.link.open + e.link.flow(1) end - def on_accepted(event) - @accepted = event - event.container.stop + def on_message(e) + @message = e.message; + e.delivery.update Disposition::ACCEPTED + e.delivery.settle end - end - h = hc.new - Container.new(h).run - assert_instance_of(Qpid::Proton::Event::Event, h.accepted) - assert_equal "testlink", h.links.first.name - assert_equal "testmessage", h.messages.first.body + end.new + + c = TestContainer.new({:id => __method__.to_s, :handler => rh}) + c.connect(c.url, {:handler => sh}).open_sender({:name => "testlink"}) + c.run + + assert sh.accepted + assert_equal "testlink", rh.link.name + assert_equal "foo", rh.message.body + assert_equal "test_simple", rh.link.connection.container_id end + + class CloseOnOpenHandler < TestHandler + def on_connection_opened(e) e.connection.close; end + def on_connection_closing(e) e.connection.close; end + end + + def test_auto_stop + c1 = Container.new "#{__method__}1" + c2 = Container.new "#{__method__}2" + + # A listener and a connection + t1 = 3.times.collect { Thread.new { c1.run } } + l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new})) + c1.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} ) + t1.each { |t| assert t.join(1) } + + # Connect between different containers, c2 has only a connection + t1 = Thread.new { c1.run } + l = c1.listen_io(TCPServer.new(0), ListenOnceHandler.new({ :handler => CloseOnOpenHandler.new})) + t2 = Thread.new {c2.run } + c2.connect("amqp://:#{l.to_io.addr[1]}", { :handler => CloseOnOpenHandler.new} ) + assert t2.join(1) + assert t1.join(1) + end + + def test_auto_stop_listener_only + c1 = Container.new "#{__method__}1" + # Listener only, external close + t1 = Thread.new { c1.run } + l = c1.listen_io(TCPServer.new(0)) + l.close + assert t1.join(1) + end + + def test_stop + c = Container.new __method__ + c.auto_stop = false + l = c.listen_io(TCPServer.new(0)) + c.connect("amqp://:#{l.to_io.addr[1]}") + threads = 5.times.collect { Thread.new { c.run } } + assert_nil threads[0].join(0.001) + c.stop + threads.each { |t| assert t.join(1) } + assert c.auto_stop # Set by stop + + # Stop an empty container + threads = 5.times.collect { Thread.new { c.run } } + assert_nil threads[0].join(0.001) + c.stop + threads.each { |t| assert t.join(1) } + end + end + class ContainerSASLTest < Minitest::Test # Handler for test client/server that sets up server and client SASL options - class SASLHandler < TestServer + class SASLHandler < TestHandler - attr_accessor :url - - def initialize(opts={}, mechanisms=nil, insecure=nil, realm=nil) + def initialize(url="amqp://", opts={}, mechanisms=nil, insecure=nil, realm=nil) super() - @opts, @mechanisms, @insecure, @realm = opts, mechanisms, insecure, realm + @url, @opts, @mechanisms, @insecure, @realm = url, opts, mechanisms, insecure, realm end def on_start(e) super - @client = e.container.connect(@url || "amqp://#{addr}", @opts) + @client = e.container.connect("#{@url}:#{e.container.port}", @opts) end def on_connection_bound(e) if e.connection != @client # Incoming server connection - @listener.close sasl = e.transport.sasl sasl.allow_insecure_mechs = @insecure unless @insecure.nil? sasl.allowed_mechs = @mechanisms unless @mechanisms.nil? @@ -86,6 +160,7 @@ class ContainerSASLTest < Minitest::Test end attr_reader :auth_user + def on_connection_opened(e) super if e.connection == @client @@ -135,26 +210,28 @@ mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS end def test_sasl_anonymous() - s = SASLHandler.new({:sasl_allowed_mechs => "ANONYMOUS"}, "ANONYMOUS") - Container.new(s).run + s = SASLHandler.new("amqp://", {:sasl_allowed_mechs => "ANONYMOUS"}) + TestContainer.new({:id => __method__.to_s, :handler => s}, {:sasl_allowed_mechs => "ANONYMOUS"}).run assert_nil(s.connections[0].user) end def test_sasl_plain_url() + skip unless SASL.extended? # Use default realm with URL, should authenticate with "default_password" - s = SASLHandler.new({:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true}, "PLAIN", true) - s.url = ("amqp://user:default_password@#{s.addr}") - Container.new(s).run + opts = {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true} + s = SASLHandler.new("amqp://user:default_password@", opts) + TestContainer.new({:id => __method__.to_s, :handler => s}, opts).run assert_equal(2, s.connections.size) assert_equal("user", s.auth_user) end def test_sasl_plain_options() + skip unless SASL.extended? # Use default realm with connection options, should authenticate with "default_password" opts = {:sasl_allowed_mechs => "PLAIN",:sasl_allow_insecure_mechs => true, :user => 'user', :password => 'default_password' } - s = SASLHandler.new(opts, "PLAIN", true) - Container.new(s).run + s = SASLHandler.new("amqp://", opts) + TestContainer.new({:id => __method__.to_s, :handler => s}, {:sasl_allowed_mechs => "PLAIN",:sasl_allow_insecure_mechs => true}).run assert_equal(2, s.connections.size) assert_equal("user", s.auth_user) end @@ -162,9 +239,9 @@ mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS # Ensure we don't allow PLAIN if allow_insecure_mechs = true is not explicitly set def test_disallow_insecure() # Don't set allow_insecure_mechs, but try to use PLAIN - s = SASLHandler.new({:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true}, "PLAIN") - s.url = "amqp://user:password@#{s.addr}" - e = assert_raises(TestError) { Container.new(s).run } + s = SASLHandler.new("amqp://user:password@", {:sasl_allowed_mechs => "PLAIN", :sasl_allow_insecure_mechs => true}) + e = assert_raises(TestError) { TestContainer.new({:id => __method__.to_s, :handler => s}, {:sasl_allowed_mechs => "PLAIN"}).run } assert_match(/PN_TRANSPORT_ERROR.*unauthorized-access/, e.to_s) end end + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org