NO-JIRA: [ruby] Re-organize Container methods public first. Remove protected section, no longer needed
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d65528c0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d65528c0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d65528c0 Branch: refs/heads/go1 Commit: d65528c01cc3d3a82527fca4dd150cb10feaf220 Parents: d37c32c Author: Alan Conway <acon...@redhat.com> Authored: Thu Mar 22 09:15:24 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Fri Mar 23 09:39:39 2018 -0400 ---------------------------------------------------------------------- proton-c/bindings/ruby/lib/core/container.rb | 316 +++++++++++----------- 1 file changed, 156 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d65528c0/proton-c/bindings/ruby/lib/core/container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb index f8ff032..2d920b4 100644 --- a/proton-c/bindings/ruby/lib/core/container.rb +++ b/proton-c/bindings/ruby/lib/core/container.rb @@ -29,163 +29,6 @@ module Qpid::Proton # One or more threads can call {#run}, events generated by all the listeners and # connections will be dispatched in the {#run} threads. class Container - private - - # Container driver applies options and adds container context to events - class ConnectionTask < Qpid::Proton::HandlerDriver - def initialize container, io, opts, server=false - super io, opts[:handler] - transport.set_server if server - transport.apply opts - connection.apply opts - end - end - - class ListenTask < Listener - - def initialize(io, handler, container) - super - @closing = @closed = nil - env = ENV['PN_TRACE_EVT'] - if env && ["true", "1", "yes", "on"].include?(env.downcase) - @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_" - else - @log_prefix = nil - end - dispatch(:on_open); - end - - def process - return if @closed - unless @closing - begin - return @io.accept, dispatch(:on_accept) - rescue IO::WaitReadable, Errno::EINTR - rescue IOError, SystemCallError => e - close e - end - end - ensure - if @closing - @io.close rescue nil - @closed = true - dispatch(:on_error, @condition) if @condition - dispatch(:on_close) - end - end - - def can_read?() !finished?; end - def can_write?() false; end - def finished?() @closed; end - - def dispatch(method, *args) - # TODO aconway 2017-11-27: better logging - STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix - @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method) - end - end - - # Selectable object that can be used to wake IO.select from another thread - class SelectWaker - def initialize - @rd, @wr = IO.pipe - @lock = Mutex.new - @set = false - end - - def to_io() @rd; end - - def wake - @lock.synchronize do - return if @set # Don't write if already has data - @set = true - begin @wr.write_nonblock('x') rescue IO::WaitWritable end - end - end - - def reset - @lock.synchronize do - return unless @set - begin @rd.read_nonblock(1) rescue IO::WaitReadable end - @set = false - end - end - - def close - @rd.close - @wr.close - end - end - - def next_tick_due(x, now) - nt = x.respond_to?(:next_tick) && x.next_tick - nt && (nt <= now) - end - - def next_tick_min(x, t) - nt = x.respond_to?(:next_tick) && x.next_tick - nt if !t || (nt < t) - end - - # Rescue any exception raised by the block and stop the container. - def maybe_panic - begin - yield - rescue Exception => e - stop(nil, e) - end - end - - # Handle a single item from the @work queue, this is the heart of the #run loop. - def run_one(task) - case task - - when :start - @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start - - when Container - r, w = [@wake], [] - next_tick = nil - @lock.synchronize do - @selectable.each do |s| - r << s if s.send :can_read? - w << s if s.send :can_write? - next_tick = next_tick_min(s, next_tick) - end - end - now = Time.now - timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick - r, w = IO.select(r, w, nil, timeout) - now = Time.now - selected = Set.new(r).delete(@wake) - selected.merge(w) if w - selected.merge(@selectable.select { |s| next_tick_due(s, now) }) - @wake.reset - stop_select = nil - @lock.synchronize do - if stop_select = @stopped # close everything - selected += @selectable - selected.each { |s| s.close @stop_err } - @wake.close - end - @selectable -= selected # Remove selected tasks - end - selected.each { |s| @work << s } # Queue up tasks needing #process - @work << self unless stop_select - - when ConnectionTask then - maybe_panic { task.process } - rearm task - - when ListenTask then - io, opts = maybe_panic { task.process } - add(connection_driver(io, opts, true)) if io - rearm task - end - end - - public - # Error raised if the container is used after {#stop} has been called. class StoppedError < RuntimeError def initialize(*args) super("container has been stopped"); end @@ -261,7 +104,7 @@ module Qpid::Proton # Number of threads in {#run} # @return [Bool] {#run} thread count - def running; @lock.synchronize { @running }; end + def running() @lock.synchronize { @running }; end # Open an AMQP connection. # @@ -391,7 +234,160 @@ module Qpid::Proton @wake.wake end - protected + private + + # Container driver applies options and adds container context to events + class ConnectionTask < Qpid::Proton::HandlerDriver + def initialize container, io, opts, server=false + super io, opts[:handler] + transport.set_server if server + transport.apply opts + connection.apply opts + end + end + + class ListenTask < Listener + + def initialize(io, handler, container) + super + @closing = @closed = nil + env = ENV['PN_TRACE_EVT'] + if env && ["true", "1", "yes", "on"].include?(env.downcase) + @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_" + else + @log_prefix = nil + end + dispatch(:on_open); + end + + def process + return if @closed + unless @closing + begin + return @io.accept, dispatch(:on_accept) + rescue IO::WaitReadable, Errno::EINTR + rescue IOError, SystemCallError => e + close e + end + end + ensure + if @closing + @io.close rescue nil + @closed = true + dispatch(:on_error, @condition) if @condition + dispatch(:on_close) + end + end + + def can_read?() !finished?; end + def can_write?() false; end + def finished?() @closed; end + + def dispatch(method, *args) + # TODO aconway 2017-11-27: better logging + STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix + @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method) + end + end + + # Selectable object that can be used to wake IO.select from another thread + class SelectWaker + def initialize + @rd, @wr = IO.pipe + @lock = Mutex.new + @set = false + end + + def to_io() @rd; end + + def wake + @lock.synchronize do + return if @set # Don't write if already has data + @set = true + begin @wr.write_nonblock('x') rescue IO::WaitWritable end + end + end + + def reset + @lock.synchronize do + return unless @set + begin @rd.read_nonblock(1) rescue IO::WaitReadable end + @set = false + end + end + + def close + @rd.close + @wr.close + end + end + + # Handle a single item from the @work queue, this is the heart of the #run loop. + def run_one(task) + case task + + when :start + @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start + + when Container + r, w = [@wake], [] + next_tick = nil + @lock.synchronize do + @selectable.each do |s| + r << s if s.send :can_read? + w << s if s.send :can_write? + next_tick = next_tick_min(s, next_tick) + end + end + now = Time.now + timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick + r, w = IO.select(r, w, nil, timeout) + now = Time.now + selected = Set.new(r).delete(@wake) + selected.merge(w) if w + selected.merge(@selectable.select { |s| next_tick_due(s, now) }) + @wake.reset + stop_select = nil + @lock.synchronize do + if stop_select = @stopped # close everything + selected += @selectable + selected.each { |s| s.close @stop_err } + @wake.close + end + @selectable -= selected # Remove selected tasks + end + selected.each { |s| @work << s } # Queue up tasks needing #process + @work << self unless stop_select + + when ConnectionTask then + maybe_panic { task.process } + rearm task + + when ListenTask then + io, opts = maybe_panic { task.process } + add(connection_driver(io, opts, true)) if io + rearm task + end + end + + def next_tick_due(x, now) + nt = x.respond_to?(:next_tick) && x.next_tick + nt && (nt <= now) + end + + def next_tick_min(x, t) + nt = x.respond_to?(:next_tick) && x.next_tick + nt if !t || (nt < t) + end + + # Rescue any exception raised by the block and stop the container. + def maybe_panic + begin + yield + rescue Exception => e + stop(nil, e) + end + end # Normally if we add work we need to set a wakeup to ensure a single #run # thread doesn't get stuck in select while there is other work on the queue. @@ -439,7 +435,7 @@ module Qpid::Proton end end - def not_stopped; raise StoppedError if @lock.synchronize { @stopped }; end + def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end end end --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org