PROTON-799: Added the engine_send and engine_recv examples to Ruby.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fb1d981e Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fb1d981e Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fb1d981e Branch: refs/heads/ruby-engine-apis Commit: fb1d981e4ce73186662841810d974bc8e996337d Parents: b0bca8f Author: Darryl L. Pierce <mcpie...@gmail.com> Authored: Tue Jan 13 16:27:57 2015 -0500 Committer: Darryl L. Pierce <mcpie...@gmail.com> Committed: Thu May 14 15:57:10 2015 -0400 ---------------------------------------------------------------------- examples/ruby/engine_recv.rb | 158 ++++++++++++++++++++++++++++++++ examples/ruby/engine_send.rb | 143 +++++++++++++++++++++++++++++ examples/ruby/lib/driver.rb | 69 ++++++++++++++ examples/ruby/lib/qpid_examples.rb | 28 ++++++ examples/ruby/lib/selectable.rb | 120 ++++++++++++++++++++++++ 5 files changed, 518 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fb1d981e/examples/ruby/engine_recv.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/engine_recv.rb b/examples/ruby/engine_recv.rb new file mode 100644 index 0000000..1529964 --- /dev/null +++ b/examples/ruby/engine_recv.rb @@ -0,0 +1,158 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +require "qpid_examples" +require "optparse" + +DEFAULT_PORT = 5672 + +options = { + :port => DEFAULT_PORT, + :debug => false, + :verbose => false, +} + +OptionParser.new do |opts| + opts.banner = "Usage: engine_recv.rb [options]" + + opts.on("-p [port]", "--port [port]", + "The port to use (def. #{DEFAULT_PORT})") do |port| + options[:port] = port + end + + opts.on("-v", "--verbose", + "Enable verbose output") do + options[:verbose] = true + end + + opts.on("-d", + "--debug", "Enable debugging") do + options[:debug] = true + end + + opts.parse! +end + +server = TCPServer.new('localhost', options[:port]) + +last_time = Time.now + +message_count = 0 +driver = Driver.new + +collector = Qpid::Proton::Event::Collector.new + +loop do + begin + client = server.accept_nonblock + rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK => error + + end + + unless client.nil? + puts "Connection from #{client.peeraddr.last}" + connection = Qpid::Proton::Connection.new + connection.collect(collector) + transport = Qpid::Proton::Transport.new(Qpid::Proton::Transport::SERVER) + transport.bind(connection) + selectable = Selectable.new(transport, client) + driver.add(selectable) + end + + # let the driver process data + driver.process + + event = collector.peek + + while !event.nil? + puts "EVENT: #{event}" if options[:debug] + + case event.type + when Qpid::Proton::Event::CONNECTION_INIT + conn = event.connection + if conn.state & Qpid::Proton::Endpoint::REMOTE_UNINIT + conn.transport.sasl.done(Qpid::Proton::SASL::OK) + end + + when Qpid::Proton::Event::CONNECTION_BOUND + conn = event.connection + if conn.state & Qpid::Proton::Endpoint::LOCAL_UNINIT + conn.open + end + + when Qpid::Proton::Event::CONNECTION_REMOTE_CLOSE + conn = event.context + if !(conn.state & Qpid::Proton::Endpoint::LOCAL_CLOSED) + conn.close + end + + when Qpid::Proton::Event::SESSION_REMOTE_OPEN + session = event.session + if session.state & Qpid::Proton::Endpoint::LOCAL_UNINIT + session.incoming_capacity = 1000000 + session.open + end + + when Qpid::Proton::Event::SESSION_REMOTE_CLOSE + session = event.session + if !(session.state & Qpid::Proton::Endpoint::LOCAL_CLOSED) + session.close + end + + when Qpid::Proton::Event::LINK_REMOTE_OPEN + link = event.link + if link.state & Qpid::Proton::Endpoint::LOCAL_UNINIT + link.open + link.flow 400 + end + + when Qpid::Proton::Event::LINK_REMOTE_CLOSE + link = event.context + if !(link.state & Qpid::Proton::Endpoint::LOCAL_CLOSED) + link.close + end + + when Qpid::Proton::Event::DELIVERY + link = event.link + delivery = event.delivery + if delivery.readable? && !delivery.partial? + # decode the message and display it + msg = Qpid::Proton::Util::Engine.receive_message(delivery) + message_count += 1 + puts "Received:" + puts " Count=#{message_count}" if options[:verbose] + puts " From=#{msg.id}" if msg.id + puts " Reply to=#{msg.reply_to}" if msg.reply_to + puts " Subject=#{msg.subject}" if msg.subject + puts " Body=#{msg.body}" if msg.body + puts "" + delivery.settle + credit = link.credit + link.flow(200) if credit <= 200 + end + + when Qpid::Proton::Event::TRANSPORT + driver.process + + end + + collector.pop + event = collector.peek + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fb1d981e/examples/ruby/engine_send.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/engine_send.rb b/examples/ruby/engine_send.rb new file mode 100644 index 0000000..189c7fd --- /dev/null +++ b/examples/ruby/engine_send.rb @@ -0,0 +1,143 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +require 'qpid_examples' +require "optparse" + +DEFAULT_ADDRESS = "0.0.0.0:5672" + +options = { + :address => DEFAULT_ADDRESS, + :debug => false, + :verbose => false, + :count => 1, + :content => "This message was sent #{Time.new}" +} + +OptionParser.new do |opts| + opts.banner = "Usage: engine_recv.rb [options]" + + opts.on("-a [address]", "--address [address]", + "The target address (def. #{DEFAULT_ADDRESS})") do |address| + options[:address] = address + end + + opts.on("-C [content]", "--content [content]", + "The message content") do |content| + options[:content] = content + end + + opts.on("-c [count]", "--count [count]", + "The number of messages to send (def. 1)") do |count| + options[:count] = count.to_i + end + + opts.on("-v", "--verbose", + "Enable verbose output") do + options[:verbose] = true + end + + opts.on("-d", + "--debug", "Enable debugging") do + options[:debug] = true + end + + opts.parse! +end + + +driver = Driver.new + +conn = Qpid::Proton::Connection.new +collector = Qpid::Proton::Event::Collector.new +conn.collect(collector) + +session = conn.session +conn.open +session.open + +sender = session.sender("tvc_15_1") +sender.target.address = "queue" +sender.open + +transport = Qpid::Proton::Transport.new +transport.bind(conn) + +address, port = options[:address].split(":") + +socket = TCPSocket.new(address, port) +selectable = Selectable.new(transport, socket) +sent_count = 0 + +sent_count = 0 + +driver.add(selectable) + +loop do + # let the driver process + driver.process + + event = collector.peek + + unless event.nil? + + print "EVENT: #{event}\n" if options[:debug] + + case event.type + + when Qpid::Proton::Event::LINK_FLOW + sender = event.sender + credit = sender.credit + + message = Qpid::Proton::Message.new + + if credit > 0 && sent_count < options[:count] + sent_count = sent_count.next + message.clear + message.address = options[:address] + message.subject = "Message #{sent_count}..." + message.body = options[:content] + + delivery = sender.delivery("#{sent_count}") + sender.send(message.encode) + delivery.settle + sender.advance + credit = sender.credit + else + sender.close + end + + when Qpid::Proton::Event::LINK_LOCAL_CLOSE + link = event.link + link.close + link.session.close + + when Qpid::Proton::Event::SESSION_LOCAL_CLOSE + session = event.session + session.connection.close + + when Qpid::Proton::Event::CONNECTION_LOCAL_CLOSE + break + + end + + collector.pop + event = collector.peek + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fb1d981e/examples/ruby/lib/driver.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/lib/driver.rb b/examples/ruby/lib/driver.rb new file mode 100644 index 0000000..4e223d0 --- /dev/null +++ b/examples/ruby/lib/driver.rb @@ -0,0 +1,69 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +class Driver + + def initialize + @selectables = {} + end + + def add(selectable) + @selectables[selectable.fileno] = selectable + end + + def process + reading = [] + writing = [] + + @selectables.each_value do |sel| + if sel.closed? || sel.fileno.nil? + @selectables.delete(sel.fileno) + else + begin + reading << sel.to_io if sel.reading? + writing << sel.to_io if sel.writing? + rescue Exception => error + puts "Error: #{error}" + puts error.backtrace.join("\n"); + # @selectables.delete(sel.fileno) + end + end + end + + read_from, write_to = IO.select(reading, writing, [], 0) + + unless read_from.nil? + read_from.each do |r| + sel = @selectables[r.fileno] + sel.readable unless sel.nil? || sel.closed? + end + end + + begin + unless write_to.nil? + write_to.each do |w| + sel = @selectables[w.fileno] + sel.writable unless sel.nil? || sel.closed? + end + end + + end + end + +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fb1d981e/examples/ruby/lib/qpid_examples.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/lib/qpid_examples.rb b/examples/ruby/lib/qpid_examples.rb new file mode 100644 index 0000000..8503fbe --- /dev/null +++ b/examples/ruby/lib/qpid_examples.rb @@ -0,0 +1,28 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +require "qpid_proton" + +require "selectable" +require "driver" +require "socket" +require "monitor" + +include Socket::Constants + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fb1d981e/examples/ruby/lib/selectable.rb ---------------------------------------------------------------------- diff --git a/examples/ruby/lib/selectable.rb b/examples/ruby/lib/selectable.rb new file mode 100644 index 0000000..779ea24 --- /dev/null +++ b/examples/ruby/lib/selectable.rb @@ -0,0 +1,120 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +class Selectable + + attr_reader :transport + + def initialize(transport, socket) + @transport = transport + @socket = socket + @socket.autoclose = true + @write_done = false + @read_done = false + end + + def closed? + return true if @socket.closed? + return false if !@read_done && !@write_done + @socket.close + true + end + + def fileno + @socket.fileno unless @socket.closed? + end + + def to_io + @socket + end + + def reading? + return false if @read_done + c = @transport.capacity + if c > 0 + return true + elsif c < 0 + @read_done = true + return false + else + return false + end + end + + def writing? + return false if @write_done + begin + p = @transport.pending + if p > 0 + return true + elsif p < 0 + @write_done = true + return false + else + return false + end + rescue Qpid::Proton::TransportError => error + @write_done = true + return false + end + end + + def readable + c = @transport.capacity + if c > 0 + begin + data = @socket.recv(c) + if data + @transport.push(data) + else + @transport.close_tail + end + rescue Exception => error + puts "read error; #{error}" + @transport.close_tail + @read_done = true + end + elsif c < 0 + @read_done = true + end + end + + def writable + begin + p = @transport.pending + if p > 0 + data = @transport.peek(p) + n = @socket.send(data, 0) + @transport.pop(n) + elsif p < 0 + @write_done = true + end + rescue Exception => error + puts "write error: #{error}" + puts error.backtrace.join("\n") + @transport.close_head + @write_done = true + end + end + + def tick(now) + @transport.tick(now) + end + +end --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org