Author: rhs Date: Mon Oct 13 20:51:05 2014 New Revision: 1631544 URL: http://svn.apache.org/r1631544 Log: added engine examples for proton-j
Added: qpid/proton/trunk/examples/engine/ qpid/proton/trunk/examples/engine/java/ qpid/proton/trunk/examples/engine/java/LICENSE qpid/proton/trunk/examples/engine/java/drain (with props) qpid/proton/trunk/examples/engine/java/pom.xml qpid/proton/trunk/examples/engine/java/server (with props) qpid/proton/trunk/examples/engine/java/spout (with props) qpid/proton/trunk/examples/engine/java/src/ qpid/proton/trunk/examples/engine/java/src/main/ qpid/proton/trunk/examples/engine/java/src/main/java/ qpid/proton/trunk/examples/engine/java/src/main/java/org/ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java Added: qpid/proton/trunk/examples/engine/java/LICENSE URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/LICENSE?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/LICENSE (added) +++ qpid/proton/trunk/examples/engine/java/LICENSE Mon Oct 13 20:51:05 2014 @@ -0,0 +1,201 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file Added: qpid/proton/trunk/examples/engine/java/drain URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/drain?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/drain (added) +++ qpid/proton/trunk/examples/engine/java/drain Mon Oct 13 20:51:05 2014 @@ -0,0 +1,2 @@ +#!/bin/bash +mvn -q -e exec:java -Dexec.mainClass=org.apache.qpid.proton.examples.Drain -Dexec.args="$@" Propchange: qpid/proton/trunk/examples/engine/java/drain ------------------------------------------------------------------------------ svn:executable = * Added: qpid/proton/trunk/examples/engine/java/pom.xml URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/pom.xml?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/pom.xml (added) +++ qpid/proton/trunk/examples/engine/java/pom.xml Mon Oct 13 20:51:05 2014 @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-project</artifactId> + <version>1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>proton-j-demo</artifactId> + <name>proton-j-demo</name> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-j</artifactId> + <version>${project.parent.version}</version> + </dependency> + </dependencies> + + <scm> + <url>http://svn.apache.org/viewvc/qpid/proton/</url> + </scm> +</project> Added: qpid/proton/trunk/examples/engine/java/server URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/server?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/server (added) +++ qpid/proton/trunk/examples/engine/java/server Mon Oct 13 20:51:05 2014 @@ -0,0 +1,2 @@ +#!/bin/bash +mvn -q -e exec:java -Dexec.mainClass=org.apache.qpid.proton.examples.Server -Dexec.args="$@" Propchange: qpid/proton/trunk/examples/engine/java/server ------------------------------------------------------------------------------ svn:executable = * Added: qpid/proton/trunk/examples/engine/java/spout URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/spout?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/spout (added) +++ qpid/proton/trunk/examples/engine/java/spout Mon Oct 13 20:51:05 2014 @@ -0,0 +1,2 @@ +#!/bin/bash +mvn -q -e exec:java -Dexec.mainClass=org.apache.qpid.proton.examples.Spout -Dexec.args="$@" Propchange: qpid/proton/trunk/examples/engine/java/spout ------------------------------------------------------------------------------ svn:executable = * Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; + +public class Drain extends BaseHandler { + + private int count; + private boolean block; + private int received; + private boolean quiet; + + public Drain(int count, boolean block, boolean quiet) { + this.count = count; + this.block = block; + this.quiet = quiet; + } + + @Override + public void onLinkLocalOpen(Event evt) { + Link link = evt.getLink(); + if (link instanceof Receiver) { + Receiver receiver = (Receiver) link; + + if (block) { + receiver.flow(count); + } else { + receiver.drain(count); + } + } + } + + @Override + public void onLinkFlow(Event evt) { + Link link = evt.getLink(); + if (link instanceof Receiver) { + Receiver receiver = (Receiver) link; + + if (!receiver.draining()) { + receiver.getSession().getConnection().close(); + } + } + } + + @Override + public void onDelivery(Event evt) { + Delivery dlv = evt.getDelivery(); + if (dlv.getLink() instanceof Receiver) { + Receiver receiver = (Receiver) dlv.getLink(); + + if (!dlv.isPartial()) { + byte[] bytes = new byte[dlv.pending()]; + receiver.recv(bytes, 0, bytes.length); + Message msg = new Message(bytes); + + if (!quiet) { + System.out.println(String.format("Got message: %s", msg)); + } + received++; + dlv.settle(); + } + + if ((received >= count) || (!block && !receiver.draining())) { + receiver.getSession().getConnection().close(); + } + } + } + + @Override + public void onConnectionRemoteClose(Event evt) { + System.out.println(String.format("Got %s messages", received)); + } + + public static void main(String[] argv) throws Exception { + List<String> switches = new ArrayList<String>(); + List<String> args = new ArrayList<String>(); + for (String s : argv) { + if (s.startsWith("-")) { + switches.add(s); + } else { + args.add(s); + } + } + + boolean quiet = switches.contains("-q"); + String address = args.isEmpty() || !args.get(0).startsWith("/") ? "//localhost" : args.remove(0); + int count = args.isEmpty() ? 1 : Integer.parseInt(args.remove(0)); + boolean block = switches.contains("-b"); + + Collector collector = Collector.Factory.create(); + + Drain drain = new Drain(count, block, quiet); + Driver driver = new Driver(collector, drain); + + Pool pool = new Pool(collector); + pool.incoming(address, null); + + driver.run(); + } +} Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.TransportException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + + +/** + * Driver + * + */ + +public class Driver extends BaseHandler +{ + + final private Collector collector; + final private Handler[] handlers; + final private Selector selector; + + public Driver(Collector collector, Handler ... handlers) throws IOException { + this.collector = collector; + this.handlers = handlers; + this.selector = Selector.open(); + } + + public void listen(String host, int port) throws IOException { + new Acceptor(host, port); + } + + public void run() throws IOException { + while (true) { + processEvents(); + + // I don't know if there is a better way to do this, but + // the only way canceled selection keys are removed from + // the key set is via a select operation, so we do this + // first to figure out whether we should exit. Without + // this we would block indefinitely when there are only + // cancelled keys remaining. + selector.selectNow(); + if (selector.keys().isEmpty()) { + selector.close(); + return; + } + + selector.selectedKeys().clear(); + selector.select(); + + for (SelectionKey key : selector.selectedKeys()) { + Selectable selectable = (Selectable) key.attachment(); + selectable.selected(); + } + } + } + + public void processEvents() { + while (true) { + Event ev = collector.peek(); + if (ev == null) break; + ev.dispatch(this); + for (Handler h : handlers) { + ev.dispatch(h); + } + collector.pop(); + } + } + + @Override + public void onTransport(Event evt) { + Transport transport = evt.getTransport(); + ChannelHandler ch = (ChannelHandler) transport.getContext(); + ch.selected(); + } + + @Override + public void onConnectionLocalOpen(Event evt) { + Connection conn = evt.getConnection(); + if (conn.getRemoteState() == EndpointState.UNINITIALIZED) { + try { + new Connector(conn); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private interface Selectable { + void selected() throws IOException; + } + + private class Acceptor implements Selectable { + + final private ServerSocketChannel socket; + final private SelectionKey key; + + Acceptor(String host, int port) throws IOException { + socket = ServerSocketChannel.open(); + socket.configureBlocking(false); + socket.bind(new InetSocketAddress(host, port)); + socket.setOption(StandardSocketOptions.SO_REUSEADDR, true); + key = socket.register(selector, SelectionKey.OP_ACCEPT, this); + } + + public void selected() throws IOException { + SocketChannel sock = socket.accept(); + System.out.println("ACCEPTED: " + sock); + Connection conn = Connection.Factory.create(); + conn.collect(collector); + Transport transport = Transport.Factory.create(); + Sasl sasl = transport.sasl(); + sasl.setMechanisms("ANONYMOUS"); + sasl.server(); + sasl.done(Sasl.PN_SASL_OK); + transport.bind(conn); + new ChannelHandler(sock, SelectionKey.OP_READ, transport); + } + } + + private class ChannelHandler implements Selectable { + + final SocketChannel socket; + final SelectionKey key; + final Transport transport; + + ChannelHandler(SocketChannel socket, int ops, Transport transport) throws IOException { + this.socket = socket; + socket.configureBlocking(false); + key = socket.register(selector, ops, this); + this.transport = transport; + transport.setContext(this); + } + + boolean update() { + if (socket.isConnected()) { + int c = transport.capacity(); + int p = transport.pending(); + if (key.isValid()) { + key.interestOps((c != 0 ? SelectionKey.OP_READ : 0) | + (p > 0 ? SelectionKey.OP_WRITE : 0)); + } + if (c < 0 && p < 0) { + return true; + } else { + return false; + } + } else { + return false; + } + } + + public void selected() { + if (!key.isValid()) { return; } + + try { + if (key.isConnectable()) { + System.out.println("CONNECTED: " + socket); + socket.finishConnect(); + } + + if (key.isReadable()) { + int c = transport.capacity(); + if (c > 0) { + ByteBuffer tail = transport.tail(); + int n = socket.read(tail); + if (n > 0) { + try { + transport.process(); + } catch (TransportException e) { + e.printStackTrace(); + } + } else if (n < 0) { + transport.close_tail(); + } + } + } + + if (key.isWritable()) { + int p = transport.pending(); + if (p > 0) { + ByteBuffer head = transport.head(); + int n = socket.write(head); + if (n > 0) { + transport.pop(n); + } else if (n < 0) { + transport.close_head(); + } + } + } + + if (update()) { + transport.unbind(); + System.out.println("CLOSING: " + socket); + socket.close(); + } + } catch (IOException e) { + transport.unbind(); + System.out.println(String.format("CLOSING(%s): %s", e, socket)); + try { + socket.close(); + } catch (IOException e2) { + throw new RuntimeException(e2); + } + } + + } + + } + + private static Transport makeTransport(Connection conn) { + Transport transport = Transport.Factory.create(); + Sasl sasl = transport.sasl(); + sasl.setMechanisms("ANONYMOUS"); + sasl.client(); + transport.bind(conn); + return transport; + } + + private class Connector extends ChannelHandler { + + Connector(Connection conn) throws IOException { + super(SocketChannel.open(), SelectionKey.OP_CONNECT, makeTransport(conn)); + System.out.println("CONNECTING: " + conn.getHostname()); + socket.connect(new InetSocketAddress(conn.getHostname(), 5672)); + } + } + +} Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; + +/** + * FlowController + * + */ + +public class FlowController extends BaseHandler +{ + + final private int window; + + public FlowController(int window) { + this.window = window; + } + + private void topUp(Receiver rcv) { + int delta = window - rcv.getCredit(); + rcv.flow(delta); + } + + @Override + public void onLinkLocalOpen(Event evt) { + Link link = evt.getLink(); + if (link instanceof Receiver) { + topUp((Receiver) link); + } + } + + @Override + public void onLinkRemoteOpen(Event evt) { + Link link = evt.getLink(); + if (link instanceof Receiver) { + topUp((Receiver) link); + } + } + + @Override + public void onLinkFlow(Event evt) { + Link link = evt.getLink(); + if (link instanceof Receiver) { + topUp((Receiver) link); + } + } + + @Override + public void onDelivery(Event evt) { + Link link = evt.getLink(); + if (link instanceof Receiver) { + topUp((Receiver) link); + } + } + +} Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; + +/** + * Handshaker + * + */ + +public class Handshaker extends BaseHandler +{ + + @Override + public void onConnectionRemoteOpen(Event evt) { + Connection conn = evt.getConnection(); + if (conn.getLocalState() == EndpointState.UNINITIALIZED) { + conn.open(); + } + } + + @Override + public void onSessionRemoteOpen(Event evt) { + Session ssn = evt.getSession(); + if (ssn.getLocalState() == EndpointState.UNINITIALIZED) { + ssn.open(); + } + } + + @Override + public void onLinkRemoteOpen(Event evt) { + Link link = evt.getLink(); + if (link.getLocalState() == EndpointState.UNINITIALIZED) { + link.setSource(link.getRemoteSource()); + link.setTarget(link.getRemoteTarget()); + link.open(); + } + } + + @Override + public void onConnectionRemoteClose(Event evt) { + Connection conn = evt.getConnection(); + if (conn.getLocalState() != EndpointState.CLOSED) { + conn.close(); + } + } + + @Override + public void onSessionRemoteClose(Event evt) { + Session ssn = evt.getSession(); + if (ssn.getLocalState() != EndpointState.CLOSED) { + ssn.close(); + } + } + + @Override + public void onLinkRemoteClose(Event evt) { + Link link = evt.getLink(); + if (link.getLocalState() != EndpointState.CLOSED) { + link.close(); + } + } + +} Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import java.nio.ByteBuffer; +import java.util.Arrays; + + +/** + * Message + * + */ + +public class Message +{ + private final byte[] bytes; + + /** + * These bytes are expected to be AMQP encoded. + */ + public Message(byte[] bytes) { + this.bytes = bytes; + } + + private static final byte[] PREFIX = {(byte)0x00, (byte)0x53, (byte)0x77, (byte)0xb1}; + + private static byte[] encodeString(String string) { + byte[] utf8 = string.getBytes(); + byte[] result = new byte[PREFIX.length + 4 + utf8.length]; + ByteBuffer bbuf = ByteBuffer.wrap(result); + bbuf.put(PREFIX); + bbuf.putInt(utf8.length); + bbuf.put(utf8); + return result; + } + + public Message(String string) { + // XXX: special case string encoding for now + this(encodeString(string)); + } + + public byte[] getBytes() { + return bytes; + } + + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("Message("); + for (byte b : bytes) { + if (b >= 32 && b < 127) { + bld.append((char) b); + } else { + bld.append("\\x"); + String hex = Integer.toHexString(0xFF & b); + if (hex.length() < 2) { + bld.append("0"); + } + bld.append(hex); + } + } + bld.append(')'); + return bld.toString(); + } + +} Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Receiver; + +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; + +import java.util.HashMap; +import java.util.Map; + +/** + * Pool + * + */ + +public class Pool +{ + + final private Collector collector; + final private Map<String,Connection> connections; + + final private LinkConstructor<Sender> outgoingConstructor = new LinkConstructor<Sender> () { + public Sender create(Session ssn, String remote, String local) { + return newOutgoing(ssn, remote, local); + } + }; + final private LinkConstructor<Receiver> incomingConstructor = new LinkConstructor<Receiver> () { + public Receiver create(Session ssn, String remote, String local) { + return newIncoming(ssn, remote, local); + } + }; + + final private LinkResolver<Sender> outgoingResolver; + final private LinkResolver<Receiver> incomingResolver; + + public Pool(Collector collector, final Router router) { + this.collector = collector; + connections = new HashMap<String,Connection>(); + + if (router != null) { + outgoingResolver = new LinkResolver<Sender>() { + public Sender resolve(String address) { + return router.getOutgoing(address).choose(); + } + }; + incomingResolver = new LinkResolver<Receiver>() { + public Receiver resolve(String address) { + return router.getIncoming(address).choose(); + } + }; + } else { + outgoingResolver = new LinkResolver<Sender>() { + public Sender resolve(String address) { return null; } + }; + incomingResolver = new LinkResolver<Receiver>() { + public Receiver resolve(String address) { return null; } + }; + } + } + + public Pool(Collector collector) { + this(collector, null); + } + + private <T extends Link> T resolve(String remote, String local, + LinkResolver<T> resolver, + LinkConstructor<T> constructor) { + String host = remote.substring(2).split("/", 2)[0]; + T link = resolver.resolve(remote); + if (link == null) { + Connection conn = connections.get(host); + if (conn == null) { + conn = Connection.Factory.create(); + conn.collect(collector); + conn.setHostname(host); + conn.open(); + connections.put(host, conn); + } + + Session ssn = conn.session(); + ssn.open(); + + link = constructor.create(ssn, remote, local); + link.open(); + } + return link; + } + + public Sender outgoing(String target, String source) { + return resolve(target, source, outgoingResolver, outgoingConstructor); + } + + public Receiver incoming(String source, String target) { + return resolve(source, target, incomingResolver, incomingConstructor); + } + + public Sender newOutgoing(Session ssn, String remote, String local) { + Sender snd = ssn.sender(String.format("%s-%s", local, remote)); + Source src = new Source(); + src.setAddress(local); + snd.setSource(src); + Target tgt = new Target(); + tgt.setAddress(remote); + snd.setTarget(tgt); + return snd; + } + + public Receiver newIncoming(Session ssn, String remote, String local) { + Receiver rcv = ssn.receiver(String.format("%s-%s", remote, local)); + Source src = new Source(); + src.setAddress(remote); + rcv.setSource(src); + Target tgt = new Target(); + tgt.setAddress(remote); + rcv.setTarget(tgt); + return rcv; + } + + public static interface LinkConstructor<T extends Link> { + T create(Session session, String remote, String local); + } + + public static interface LinkResolver<T extends Link> { + T resolve(String remote); + } + +} Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,191 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import org.apache.qpid.proton.amqp.transport.Source; +import org.apache.qpid.proton.amqp.transport.Target; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + + +/** + * Router + * + */ + +public class Router extends BaseHandler +{ + + public static class Routes<T extends Link> { + + List<T> routes = new ArrayList<T>(); + + void add(T route) { + routes.add(route); + } + + void remove(T route) { + routes.remove(route); + } + + int size() { + return routes.size(); + } + + public T choose() { + if (routes.isEmpty()) { return null; } + ThreadLocalRandom rand = ThreadLocalRandom.current(); + int idx = rand.nextInt(0, routes.size()); + return routes.get(idx); + } + + } + + private static final Routes<Sender> EMPTY_OUT = new Routes<Sender>(); + private static final Routes<Receiver> EMPTY_IN = new Routes<Receiver>(); + + final private Map<String,Routes<Sender>> outgoing = new HashMap<String,Routes<Sender>>(); + final private Map<String,Routes<Receiver>> incoming = new HashMap<String,Routes<Receiver>>(); + + public Router() {} + + private String getAddress(Source source) { + if (source == null) { + return null; + } else { + return source.getAddress(); + } + } + + private String getAddress(Target target) { + if (target == null) { + return null; + } else { + return target.getAddress(); + } + } + + public String getAddress(Sender snd) { + String source = getAddress(snd.getSource()); + String target = getAddress(snd.getTarget()); + return source != null ? source : target; + } + + public String getAddress(Receiver rcv) { + return getAddress(rcv.getTarget()); + } + + public Routes<Sender> getOutgoing(String address) { + Routes<Sender> routes = outgoing.get(address); + if (routes == null) { return EMPTY_OUT; } + return routes; + } + + public Routes<Receiver> getIncoming(String address) { + Routes<Receiver> routes = incoming.get(address); + if (routes == null) { return EMPTY_IN; } + return routes; + } + + private void add(Sender snd) { + String address = getAddress(snd); + Routes<Sender> routes = outgoing.get(address); + if (routes == null) { + routes = new Routes<Sender>(); + outgoing.put(address, routes); + } + routes.add(snd); + } + + private void remove(Sender snd) { + String address = getAddress(snd); + Routes<Sender> routes = outgoing.get(address); + if (routes != null) { + routes.remove(snd); + if (routes.size() == 0) { + outgoing.remove(address); + } + } + } + + private void add(Receiver rcv) { + String address = getAddress(rcv); + Routes<Receiver> routes = incoming.get(address); + if (routes == null) { + routes = new Routes<Receiver>(); + incoming.put(address, routes); + } + routes.add(rcv); + } + + private void remove(Receiver rcv) { + String address = getAddress(rcv); + Routes<Receiver> routes = incoming.get(address); + if (routes != null) { + routes.remove(rcv); + if (routes.size() == 0) { + incoming.remove(address); + } + } + } + + private void add(Link link) { + if (link instanceof Sender) { + add((Sender) link); + } else { + add((Receiver) link); + } + } + + private void remove(Link link) { + if (link instanceof Sender) { + remove((Sender) link); + } else { + remove((Receiver) link); + } + } + + @Override + public void onLinkLocalOpen(Event evt) { + add(evt.getLink()); + } + + @Override + public void onLinkLocalClose(Event evt) { + remove(evt.getLink()); + } + + @Override + public void onLinkFinal(Event evt) { + remove(evt.getLink()); + } + +} Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; + +import java.io.IOException; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Server + * + */ + +public class Server extends BaseHandler +{ + + private class MessageStore { + + Map<String,Deque<Message>> messages = new HashMap<String,Deque<Message>>(); + + void put(String address, Message message) { + Deque<Message> queue = messages.get(address); + if (queue == null) { + queue = new ArrayDeque<Message>(); + messages.put(address, queue); + } + queue.add(message); + } + + Message get(String address) { + Deque<Message> queue = messages.get(address); + if (queue == null) { return null; } + Message msg = queue.remove(); + if (queue.isEmpty()) { + messages.remove(address); + } + return msg; + } + + } + + final private MessageStore messages = new MessageStore(); + final private Router router; + private boolean quiet; + private int tag = 0; + + public Server(Router router, boolean quiet) { + this.router = router; + this.quiet = quiet; + } + + private byte[] nextTag() { + return String.format("%s", tag++).getBytes(); + } + + private int send(String address) { + return send(address, null); + } + + private int send(String address, Sender snd) { + if (snd == null) { + Router.Routes<Sender> routes = router.getOutgoing(address); + snd = routes.choose(); + if (snd == null) { + return 0; + } + } + + int count = 0; + while (snd.getCredit() > 0 && snd.getQueued() < 1024) { + Message msg = messages.get(address); + if (msg == null) { + snd.drained(); + return count; + } + Delivery dlv = snd.delivery(nextTag()); + byte[] bytes = msg.getBytes(); + snd.send(bytes, 0, bytes.length); + dlv.settle(); + count++; + if (!quiet) { + System.out.println(String.format("Sent message(%s): %s", address, msg)); + } + } + + return count; + } + + @Override + public void onLinkFlow(Event evt) { + Link link = evt.getLink(); + if (link instanceof Sender) { + Sender snd = (Sender) link; + send(router.getAddress(snd), snd); + } + } + + @Override + public void onDelivery(Event evt) { + Delivery dlv = evt.getDelivery(); + Link link = dlv.getLink(); + if (link instanceof Sender) { + dlv.settle(); + } else { + Receiver rcv = (Receiver) link; + if (!dlv.isPartial()) { + byte[] bytes = new byte[dlv.pending()]; + rcv.recv(bytes, 0, bytes.length); + String address = router.getAddress(rcv); + Message message = new Message(bytes); + messages.put(address, message); + dlv.disposition(Accepted.getInstance()); + dlv.settle(); + if (!quiet) { + System.out.println(String.format("Got message(%s): %s", address, message)); + } + send(address); + } + } + } + + public static final void main(String[] argv) throws IOException { + List<String> switches = new ArrayList<String>(); + List<String> args = new ArrayList<String>(); + for (String s : argv) { + if (s.startsWith("-")) { + switches.add(s); + } else { + args.add(s); + } + } + + boolean quiet = switches.contains("-q"); + String host = !args.isEmpty() && !Character.isDigit(args.get(0).charAt(0)) ? + args.remove(0) : "localhost"; + int port = !args.isEmpty() ? Integer.parseInt(args.remove(0)) : 5672; + + Collector collector = Collector.Factory.create(); + Router router = new Router(); + Driver driver = new Driver(collector, new Handshaker(), + new FlowController(1024), router, + new Server(router, quiet)); + driver.listen(host, port); + driver.run(); + } + +} Added: qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java?rev=1631544&view=auto ============================================================================== --- qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java (added) +++ qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java Mon Oct 13 20:51:05 2014 @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.examples; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Sender; + +public class Spout extends BaseHandler +{ + private int count; + private int sent; + private int settled; + private boolean quiet; + + public Spout(int count, boolean quiet) { + this.count = count; + this.quiet = quiet; + } + + @Override + public void onLinkFlow(Event evt) { + Link link = evt.getLink(); + if (link instanceof Sender) { + Sender sender = (Sender) link; + while ((sent < count) && sender.getCredit() > 0) { + Delivery dlv = sender.delivery(String.format("spout-%s", sent).getBytes()); + + Message msg = new Message(String.format("Hello World! [%s]", sent)); + byte[] bytes = msg.getBytes(); + sender.send(bytes, 0, bytes.length); + sender.advance(); + + if (!quiet) { + System.out.println(String.format("Sent %s to %s: %s", new String(dlv.getTag()), + sender.getTarget().getAddress(), msg)); + } + sent++; + } + } + } + + @Override + public void onDelivery(Event evt) { + Delivery dlv = evt.getDelivery(); + if (dlv.remotelySettled()) { + if (!quiet) { + System.out.println(String.format("Settled %s: %s", new String(dlv.getTag()), dlv.getRemoteState())); + } + dlv.settle(); + settled++; + } + + if (settled >= count) { + dlv.getLink().getSession().getConnection().close(); + } + } + + @Override + public void onConnectionRemoteClose(Event evt) { + System.out.println("settled: " + settled); + } + + public static void main(String[] argv) throws Exception { + List<String> switches = new ArrayList<String>(); + List<String> args = new ArrayList<String>(); + for (String s : argv) { + if (s.startsWith("-")) { + switches.add(s); + } else { + args.add(s); + } + } + + boolean quiet = switches.contains("-q"); + String address = !args.isEmpty() && args.get(0).startsWith("/") ? + args.remove(0) : "//localhost"; + int count = !args.isEmpty() ? Integer.parseInt(args.remove(0)) : 1; + + Collector collector = Collector.Factory.create(); + + Spout spout = new Spout(count, quiet); + + Driver driver = new Driver(collector, spout); + + Pool pool = new Pool(collector); + pool.outgoing(address, null); + + driver.run(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org