Author: rhs
Date: Mon Oct 13 20:51:05 2014
New Revision: 1631544

URL: http://svn.apache.org/r1631544
Log:
added engine examples for proton-j

Added:
    qpid/proton/trunk/examples/engine/
    qpid/proton/trunk/examples/engine/java/
    qpid/proton/trunk/examples/engine/java/LICENSE
    qpid/proton/trunk/examples/engine/java/drain   (with props)
    qpid/proton/trunk/examples/engine/java/pom.xml
    qpid/proton/trunk/examples/engine/java/server   (with props)
    qpid/proton/trunk/examples/engine/java/spout   (with props)
    qpid/proton/trunk/examples/engine/java/src/
    qpid/proton/trunk/examples/engine/java/src/main/
    qpid/proton/trunk/examples/engine/java/src/main/java/
    qpid/proton/trunk/examples/engine/java/src/main/java/org/
    qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/
    qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/
    qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java
    
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java

Added: qpid/proton/trunk/examples/engine/java/LICENSE
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/LICENSE?rev=1631544&view=auto
==============================================================================
--- qpid/proton/trunk/examples/engine/java/LICENSE (added)
+++ qpid/proton/trunk/examples/engine/java/LICENSE Mon Oct 13 20:51:05 2014
@@ -0,0 +1,201 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
\ No newline at end of file

Added: qpid/proton/trunk/examples/engine/java/drain
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/drain?rev=1631544&view=auto
==============================================================================
--- qpid/proton/trunk/examples/engine/java/drain (added)
+++ qpid/proton/trunk/examples/engine/java/drain Mon Oct 13 20:51:05 2014
@@ -0,0 +1,2 @@
+#!/bin/bash
+mvn -q -e exec:java -Dexec.mainClass=org.apache.qpid.proton.examples.Drain 
-Dexec.args="$@"

Propchange: qpid/proton/trunk/examples/engine/java/drain
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/proton/trunk/examples/engine/java/pom.xml
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/pom.xml?rev=1631544&view=auto
==============================================================================
--- qpid/proton/trunk/examples/engine/java/pom.xml (added)
+++ qpid/proton/trunk/examples/engine/java/pom.xml Mon Oct 13 20:51:05 2014
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <parent>
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>proton-project</artifactId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>proton-j-demo</artifactId>
+  <name>proton-j-demo</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>proton-j</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+  </dependencies>
+
+  <scm>
+    <url>http://svn.apache.org/viewvc/qpid/proton/</url>
+  </scm>
+</project>

Added: qpid/proton/trunk/examples/engine/java/server
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/server?rev=1631544&view=auto
==============================================================================
--- qpid/proton/trunk/examples/engine/java/server (added)
+++ qpid/proton/trunk/examples/engine/java/server Mon Oct 13 20:51:05 2014
@@ -0,0 +1,2 @@
+#!/bin/bash
+mvn -q -e exec:java -Dexec.mainClass=org.apache.qpid.proton.examples.Server 
-Dexec.args="$@"

Propchange: qpid/proton/trunk/examples/engine/java/server
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/proton/trunk/examples/engine/java/spout
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/spout?rev=1631544&view=auto
==============================================================================
--- qpid/proton/trunk/examples/engine/java/spout (added)
+++ qpid/proton/trunk/examples/engine/java/spout Mon Oct 13 20:51:05 2014
@@ -0,0 +1,2 @@
+#!/bin/bash
+mvn -q -e exec:java -Dexec.mainClass=org.apache.qpid.proton.examples.Spout 
-Dexec.args="$@"

Propchange: qpid/proton/trunk/examples/engine/java/spout
------------------------------------------------------------------------------
    svn:executable = *

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Drain.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+
+public class Drain extends BaseHandler {
+
+    private int count;
+    private boolean block;
+    private int received;
+    private boolean quiet;
+
+    public Drain(int count, boolean block, boolean quiet) {
+        this.count = count;
+        this.block = block;
+        this.quiet = quiet;
+    }
+
+    @Override
+    public void onLinkLocalOpen(Event evt) {
+        Link link = evt.getLink();
+        if (link instanceof Receiver) {
+            Receiver receiver = (Receiver) link;
+
+            if (block) {
+                receiver.flow(count);
+            } else {
+                receiver.drain(count);
+            }
+        }
+    }
+
+    @Override
+    public void onLinkFlow(Event evt) {
+        Link link = evt.getLink();
+        if (link instanceof Receiver) {
+            Receiver receiver = (Receiver) link;
+
+            if (!receiver.draining()) {
+                receiver.getSession().getConnection().close();
+            }
+        }
+    }
+
+    @Override
+    public void onDelivery(Event evt) {
+        Delivery dlv = evt.getDelivery();
+        if (dlv.getLink() instanceof Receiver) {
+            Receiver receiver = (Receiver) dlv.getLink();
+
+            if (!dlv.isPartial()) {
+                byte[] bytes = new byte[dlv.pending()];
+                receiver.recv(bytes, 0, bytes.length);
+                Message msg = new Message(bytes);
+
+                if (!quiet) {
+                    System.out.println(String.format("Got message: %s", msg));
+                }
+                received++;
+                dlv.settle();
+            }
+
+            if ((received >= count) || (!block && !receiver.draining())) {
+                receiver.getSession().getConnection().close();
+            }
+        }
+    }
+
+    @Override
+    public void onConnectionRemoteClose(Event evt) {
+        System.out.println(String.format("Got %s messages", received));
+    }
+
+    public static void main(String[] argv) throws Exception {
+        List<String> switches = new ArrayList<String>();
+        List<String> args = new ArrayList<String>();
+        for (String s : argv) {
+            if (s.startsWith("-")) {
+                switches.add(s);
+            } else {
+                args.add(s);
+            }
+        }
+
+        boolean quiet = switches.contains("-q");
+        String address = args.isEmpty() || !args.get(0).startsWith("/") ? 
"//localhost" : args.remove(0);
+        int count = args.isEmpty() ? 1 : Integer.parseInt(args.remove(0));
+        boolean block = switches.contains("-b");
+
+        Collector collector = Collector.Factory.create();
+
+        Drain drain = new Drain(count, block, quiet);
+        Driver driver = new Driver(collector, drain);
+
+        Pool pool = new Pool(collector);
+        pool.incoming(address, null);
+
+        driver.run();
+    }
+}

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Driver.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * Driver
+ *
+ */
+
+public class Driver extends BaseHandler
+{
+
+    final private Collector collector;
+    final private Handler[] handlers;
+    final private Selector selector;
+
+    public Driver(Collector collector, Handler ... handlers) throws 
IOException {
+        this.collector = collector;
+        this.handlers = handlers;
+        this.selector = Selector.open();
+    }
+
+    public void listen(String host, int port) throws IOException {
+        new Acceptor(host, port);
+    }
+
+    public void run() throws IOException {
+        while (true) {
+            processEvents();
+
+            // I don't know if there is a better way to do this, but
+            // the only way canceled selection keys are removed from
+            // the key set is via a select operation, so we do this
+            // first to figure out whether we should exit. Without
+            // this we would block indefinitely when there are only
+            // cancelled keys remaining.
+            selector.selectNow();
+            if (selector.keys().isEmpty()) {
+                selector.close();
+                return;
+            }
+
+            selector.selectedKeys().clear();
+            selector.select();
+
+            for (SelectionKey key : selector.selectedKeys()) {
+                Selectable selectable = (Selectable) key.attachment();
+                selectable.selected();
+            }
+        }
+    }
+
+    public void processEvents() {
+        while (true) {
+            Event ev = collector.peek();
+            if (ev == null) break;
+            ev.dispatch(this);
+            for (Handler h : handlers) {
+                ev.dispatch(h);
+            }
+            collector.pop();
+        }
+    }
+
+    @Override
+    public void onTransport(Event evt) {
+        Transport transport = evt.getTransport();
+        ChannelHandler ch = (ChannelHandler) transport.getContext();
+        ch.selected();
+    }
+
+    @Override
+    public void onConnectionLocalOpen(Event evt) {
+        Connection conn = evt.getConnection();
+        if (conn.getRemoteState() == EndpointState.UNINITIALIZED) {
+            try {
+                new Connector(conn);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private interface Selectable {
+        void selected() throws IOException;
+    }
+
+    private class Acceptor implements Selectable {
+
+        final private ServerSocketChannel socket;
+        final private SelectionKey key;
+
+        Acceptor(String host, int port) throws IOException {
+            socket = ServerSocketChannel.open();
+            socket.configureBlocking(false);
+            socket.bind(new InetSocketAddress(host, port));
+            socket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+            key = socket.register(selector, SelectionKey.OP_ACCEPT, this);
+        }
+
+        public void selected() throws IOException {
+            SocketChannel sock = socket.accept();
+            System.out.println("ACCEPTED: " + sock);
+            Connection conn = Connection.Factory.create();
+            conn.collect(collector);
+            Transport transport = Transport.Factory.create();
+            Sasl sasl = transport.sasl();
+            sasl.setMechanisms("ANONYMOUS");
+            sasl.server();
+            sasl.done(Sasl.PN_SASL_OK);
+            transport.bind(conn);
+            new ChannelHandler(sock, SelectionKey.OP_READ, transport);
+        }
+    }
+
+    private class ChannelHandler implements Selectable {
+
+        final SocketChannel socket;
+        final SelectionKey key;
+        final Transport transport;
+
+        ChannelHandler(SocketChannel socket, int ops, Transport transport) 
throws IOException {
+            this.socket = socket;
+            socket.configureBlocking(false);
+            key = socket.register(selector, ops, this);
+            this.transport = transport;
+            transport.setContext(this);
+        }
+
+        boolean update() {
+            if (socket.isConnected()) {
+                int c = transport.capacity();
+                int p = transport.pending();
+                if (key.isValid()) {
+                    key.interestOps((c != 0 ? SelectionKey.OP_READ : 0) |
+                                    (p > 0 ? SelectionKey.OP_WRITE : 0));
+                }
+                if (c < 0 && p < 0) {
+                    return true;
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+        }
+
+        public void selected() {
+            if (!key.isValid()) { return; }
+
+            try {
+                if (key.isConnectable()) {
+                    System.out.println("CONNECTED: " + socket);
+                    socket.finishConnect();
+                }
+
+                if (key.isReadable()) {
+                    int c = transport.capacity();
+                    if (c > 0) {
+                        ByteBuffer tail = transport.tail();
+                        int n = socket.read(tail);
+                        if (n > 0) {
+                            try {
+                                transport.process();
+                            } catch (TransportException e) {
+                                e.printStackTrace();
+                            }
+                        } else if (n < 0) {
+                            transport.close_tail();
+                        }
+                    }
+                }
+
+                if (key.isWritable()) {
+                    int p = transport.pending();
+                    if (p > 0) {
+                        ByteBuffer head = transport.head();
+                        int n = socket.write(head);
+                        if (n > 0) {
+                            transport.pop(n);
+                        } else if (n < 0) {
+                            transport.close_head();
+                        }
+                    }
+                }
+
+                if (update()) {
+                    transport.unbind();
+                    System.out.println("CLOSING: " + socket);
+                    socket.close();
+                }
+            } catch (IOException e) {
+                transport.unbind();
+                System.out.println(String.format("CLOSING(%s): %s", e, 
socket));
+                try {
+                    socket.close();
+                } catch (IOException e2) {
+                    throw new RuntimeException(e2);
+                }
+            }
+
+        }
+
+    }
+
+    private static Transport makeTransport(Connection conn) {
+        Transport transport = Transport.Factory.create();
+        Sasl sasl = transport.sasl();
+        sasl.setMechanisms("ANONYMOUS");
+        sasl.client();
+        transport.bind(conn);
+        return transport;
+    }
+
+    private class Connector extends ChannelHandler {
+
+        Connector(Connection conn) throws IOException {
+            super(SocketChannel.open(), SelectionKey.OP_CONNECT, 
makeTransport(conn));
+            System.out.println("CONNECTING: " + conn.getHostname());
+            socket.connect(new InetSocketAddress(conn.getHostname(), 5672));
+        }
+    }
+
+}

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/FlowController.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+
+/**
+ * FlowController
+ *
+ */
+
+public class FlowController extends BaseHandler
+{
+
+    final private int window;
+
+    public FlowController(int window) {
+        this.window = window;
+    }
+
+    private void topUp(Receiver rcv) {
+        int delta = window - rcv.getCredit();
+        rcv.flow(delta);
+    }
+
+    @Override
+    public void onLinkLocalOpen(Event evt) {
+        Link link = evt.getLink();
+        if (link instanceof Receiver) {
+            topUp((Receiver) link);
+        }
+    }
+
+    @Override
+    public void onLinkRemoteOpen(Event evt) {
+        Link link = evt.getLink();
+        if (link instanceof Receiver) {
+            topUp((Receiver) link);
+        }
+    }
+
+    @Override
+    public void onLinkFlow(Event evt) {
+        Link link = evt.getLink();
+        if (link instanceof Receiver) {
+            topUp((Receiver) link);
+        }
+    }
+
+    @Override
+    public void onDelivery(Event evt) {
+        Link link = evt.getLink();
+        if (link instanceof Receiver) {
+            topUp((Receiver) link);
+        }
+    }
+
+}

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Handshaker.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+
+/**
+ * Handshaker
+ *
+ */
+
+public class Handshaker extends BaseHandler
+{
+
+    @Override
+    public void onConnectionRemoteOpen(Event evt) {
+        Connection conn = evt.getConnection();
+        if (conn.getLocalState() == EndpointState.UNINITIALIZED) {
+            conn.open();
+        }
+    }
+
+    @Override
+    public void onSessionRemoteOpen(Event evt) {
+        Session ssn = evt.getSession();
+        if (ssn.getLocalState() == EndpointState.UNINITIALIZED) {
+            ssn.open();
+        }
+    }
+
+    @Override
+    public void onLinkRemoteOpen(Event evt) {
+        Link link = evt.getLink();
+        if (link.getLocalState() == EndpointState.UNINITIALIZED) {
+            link.setSource(link.getRemoteSource());
+            link.setTarget(link.getRemoteTarget());
+            link.open();
+        }
+    }
+
+    @Override
+    public void onConnectionRemoteClose(Event evt) {
+        Connection conn = evt.getConnection();
+        if (conn.getLocalState() != EndpointState.CLOSED) {
+            conn.close();
+        }
+    }
+
+    @Override
+    public void onSessionRemoteClose(Event evt) {
+        Session ssn = evt.getSession();
+        if (ssn.getLocalState() != EndpointState.CLOSED) {
+            ssn.close();
+        }
+    }
+
+    @Override
+    public void onLinkRemoteClose(Event evt) {
+        Link link = evt.getLink();
+        if (link.getLocalState() != EndpointState.CLOSED) {
+            link.close();
+        }
+    }
+
+}

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Message.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * Message
+ *
+ */
+
+public class Message
+{
+    private final byte[] bytes;
+
+    /**
+     * These bytes are expected to be AMQP encoded.
+     */
+    public Message(byte[] bytes) {
+        this.bytes = bytes;
+    }
+
+    private static final byte[] PREFIX = {(byte)0x00, (byte)0x53, (byte)0x77, 
(byte)0xb1};
+
+    private static byte[] encodeString(String string) {
+        byte[] utf8 = string.getBytes();
+        byte[] result = new byte[PREFIX.length + 4 + utf8.length];
+        ByteBuffer bbuf = ByteBuffer.wrap(result);
+        bbuf.put(PREFIX);
+        bbuf.putInt(utf8.length);
+        bbuf.put(utf8);
+        return result;
+    }
+
+    public Message(String string) {
+        // XXX: special case string encoding for now
+        this(encodeString(string));
+    }
+
+    public byte[] getBytes() {
+        return bytes;
+    }
+
+    public String toString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("Message(");
+        for (byte b : bytes) {
+            if (b >= 32 && b < 127) {
+                bld.append((char) b);
+            } else {
+                bld.append("\\x");
+                String hex = Integer.toHexString(0xFF & b);
+                if (hex.length() < 2) {
+                    bld.append("0");
+                }
+                bld.append(hex);
+            }
+        }
+        bld.append(')');
+        return bld.toString();
+    }
+
+}

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Pool.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Receiver;
+
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Pool
+ *
+ */
+
+public class Pool
+{
+
+    final private Collector collector;
+    final private Map<String,Connection> connections;
+
+    final private LinkConstructor<Sender> outgoingConstructor = new 
LinkConstructor<Sender> () {
+        public Sender create(Session ssn, String remote, String local) {
+            return newOutgoing(ssn, remote, local);
+        }
+    };
+    final private LinkConstructor<Receiver> incomingConstructor = new 
LinkConstructor<Receiver> () {
+        public Receiver create(Session ssn, String remote, String local) {
+            return newIncoming(ssn, remote, local);
+        }
+    };
+
+    final private LinkResolver<Sender> outgoingResolver;
+    final private LinkResolver<Receiver> incomingResolver;
+
+    public Pool(Collector collector, final Router router) {
+        this.collector = collector;
+        connections = new HashMap<String,Connection>();
+
+        if (router != null) {
+            outgoingResolver = new LinkResolver<Sender>() {
+                public Sender resolve(String address) {
+                    return router.getOutgoing(address).choose();
+                }
+            };
+            incomingResolver = new LinkResolver<Receiver>() {
+                public Receiver resolve(String address) {
+                    return router.getIncoming(address).choose();
+                }
+            };
+        } else {
+            outgoingResolver = new LinkResolver<Sender>() {
+                public Sender resolve(String address) { return null; }
+            };
+            incomingResolver = new LinkResolver<Receiver>() {
+                public Receiver resolve(String address) { return null; }
+            };
+        }
+    }
+
+    public Pool(Collector collector) {
+        this(collector, null);
+    }
+
+    private <T extends Link> T resolve(String remote, String local,
+                                       LinkResolver<T> resolver,
+                                       LinkConstructor<T> constructor) {
+        String host = remote.substring(2).split("/", 2)[0];
+        T link = resolver.resolve(remote);
+        if (link == null) {
+            Connection conn = connections.get(host);
+            if (conn == null) {
+                conn = Connection.Factory.create();
+                conn.collect(collector);
+                conn.setHostname(host);
+                conn.open();
+                connections.put(host, conn);
+            }
+
+            Session ssn = conn.session();
+            ssn.open();
+
+            link = constructor.create(ssn, remote, local);
+            link.open();
+        }
+        return link;
+    }
+
+    public Sender outgoing(String target, String source) {
+        return resolve(target, source, outgoingResolver, outgoingConstructor);
+    }
+
+    public Receiver incoming(String source, String target) {
+        return resolve(source, target, incomingResolver, incomingConstructor);
+    }
+
+    public Sender newOutgoing(Session ssn, String remote, String local) {
+        Sender snd = ssn.sender(String.format("%s-%s", local, remote));
+        Source src = new Source();
+        src.setAddress(local);
+        snd.setSource(src);
+        Target tgt = new Target();
+        tgt.setAddress(remote);
+        snd.setTarget(tgt);
+        return snd;
+    }
+
+    public Receiver newIncoming(Session ssn, String remote, String local) {
+        Receiver rcv = ssn.receiver(String.format("%s-%s", remote, local));
+        Source src = new Source();
+        src.setAddress(remote);
+        rcv.setSource(src);
+        Target tgt = new Target();
+        tgt.setAddress(remote);
+        rcv.setTarget(tgt);
+        return rcv;
+    }
+
+    public static interface LinkConstructor<T extends Link> {
+        T create(Session session, String remote, String local);
+    }
+
+    public static interface LinkResolver<T extends Link> {
+        T resolve(String remote);
+    }
+
+}

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Router.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,191 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.amqp.transport.Source;
+import org.apache.qpid.proton.amqp.transport.Target;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+
+/**
+ * Router
+ *
+ */
+
+public class Router extends BaseHandler
+{
+
+    public static class Routes<T extends Link> {
+
+        List<T> routes = new ArrayList<T>();
+
+        void add(T route) {
+            routes.add(route);
+        }
+
+        void remove(T route) {
+            routes.remove(route);
+        }
+
+        int size() {
+            return routes.size();
+        }
+
+        public T choose() {
+            if (routes.isEmpty()) { return null; }
+            ThreadLocalRandom rand = ThreadLocalRandom.current();
+            int idx = rand.nextInt(0, routes.size());
+            return routes.get(idx);
+        }
+
+    }
+
+    private static final Routes<Sender> EMPTY_OUT = new Routes<Sender>();
+    private static final Routes<Receiver> EMPTY_IN = new Routes<Receiver>();
+
+    final private Map<String,Routes<Sender>> outgoing = new 
HashMap<String,Routes<Sender>>();
+    final private Map<String,Routes<Receiver>> incoming = new 
HashMap<String,Routes<Receiver>>();
+
+    public Router() {}
+
+    private String getAddress(Source source) {
+        if (source == null) {
+            return null;
+        } else {
+            return source.getAddress();
+        }
+    }
+
+    private String getAddress(Target target) {
+        if (target == null) {
+            return null;
+        } else {
+            return target.getAddress();
+        }
+    }
+
+    public String getAddress(Sender snd) {
+        String source = getAddress(snd.getSource());
+        String target = getAddress(snd.getTarget());
+        return source != null ? source : target;
+    }
+
+    public String getAddress(Receiver rcv) {
+        return getAddress(rcv.getTarget());
+    }
+
+    public Routes<Sender> getOutgoing(String address) {
+        Routes<Sender> routes = outgoing.get(address);
+        if (routes == null) { return EMPTY_OUT; }
+        return routes;
+    }
+
+    public Routes<Receiver> getIncoming(String address) {
+        Routes<Receiver> routes = incoming.get(address);
+        if (routes == null) { return EMPTY_IN; }
+        return routes;
+    }
+
+    private void add(Sender snd) {
+        String address = getAddress(snd);
+        Routes<Sender> routes = outgoing.get(address);
+        if (routes == null) {
+            routes = new Routes<Sender>();
+            outgoing.put(address, routes);
+        }
+        routes.add(snd);
+    }
+
+    private void remove(Sender snd) {
+        String address = getAddress(snd);
+        Routes<Sender> routes = outgoing.get(address);
+        if (routes != null) {
+            routes.remove(snd);
+            if (routes.size() == 0) {
+                outgoing.remove(address);
+            }
+        }
+    }
+
+    private void add(Receiver rcv) {
+        String address = getAddress(rcv);
+        Routes<Receiver> routes = incoming.get(address);
+        if (routes == null) {
+            routes = new Routes<Receiver>();
+            incoming.put(address, routes);
+        }
+        routes.add(rcv);
+    }
+
+    private void remove(Receiver rcv) {
+        String address = getAddress(rcv);
+        Routes<Receiver> routes = incoming.get(address);
+        if (routes != null) {
+            routes.remove(rcv);
+            if (routes.size() == 0) {
+                incoming.remove(address);
+            }
+        }
+    }
+
+    private void add(Link link) {
+        if (link instanceof Sender) {
+            add((Sender) link);
+        } else {
+            add((Receiver) link);
+        }
+    }
+
+    private void remove(Link link) {
+        if (link instanceof Sender) {
+            remove((Sender) link);
+        } else {
+            remove((Receiver) link);
+        }
+    }
+
+    @Override
+    public void onLinkLocalOpen(Event evt) {
+        add(evt.getLink());
+    }
+
+    @Override
+    public void onLinkLocalClose(Event evt) {
+        remove(evt.getLink());
+    }
+
+    @Override
+    public void onLinkFinal(Event evt) {
+        remove(evt.getLink());
+    }
+
+}

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Server.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+
+import java.io.IOException;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Server
+ *
+ */
+
+public class Server extends BaseHandler
+{
+
+    private class MessageStore {
+
+        Map<String,Deque<Message>> messages = new 
HashMap<String,Deque<Message>>();
+
+        void put(String address, Message message) {
+            Deque<Message> queue = messages.get(address);
+            if (queue == null) {
+                queue = new ArrayDeque<Message>();
+                messages.put(address, queue);
+            }
+            queue.add(message);
+        }
+
+        Message get(String address) {
+            Deque<Message> queue = messages.get(address);
+            if (queue == null) { return null; }
+            Message msg = queue.remove();
+            if (queue.isEmpty()) {
+                messages.remove(address);
+            }
+            return msg;
+        }
+
+    }
+
+    final private MessageStore messages = new MessageStore();
+    final private Router router;
+    private boolean quiet;
+    private int tag = 0;
+
+    public Server(Router router, boolean quiet) {
+        this.router = router;
+        this.quiet = quiet;
+    }
+
+    private byte[] nextTag() {
+        return String.format("%s", tag++).getBytes();
+    }
+
+    private int send(String address) {
+        return send(address, null);
+    }
+
+    private int send(String address, Sender snd) {
+        if (snd == null) {
+            Router.Routes<Sender> routes = router.getOutgoing(address);
+            snd = routes.choose();
+            if (snd == null) {
+                return 0;
+            }
+        }
+
+        int count = 0;
+        while (snd.getCredit() > 0 && snd.getQueued() < 1024) {
+            Message msg = messages.get(address);
+            if (msg == null) {
+                snd.drained();
+                return count;
+            }
+            Delivery dlv = snd.delivery(nextTag());
+            byte[] bytes = msg.getBytes();
+            snd.send(bytes, 0, bytes.length);
+            dlv.settle();
+            count++;
+            if (!quiet) {
+                System.out.println(String.format("Sent message(%s): %s", 
address, msg));
+            }
+        }
+
+        return count;
+    }
+
+    @Override
+    public void onLinkFlow(Event evt) {
+        Link link = evt.getLink();
+        if (link instanceof Sender) {
+            Sender snd = (Sender) link;
+            send(router.getAddress(snd), snd);
+        }
+    }
+
+    @Override
+    public void onDelivery(Event evt) {
+        Delivery dlv = evt.getDelivery();
+        Link link = dlv.getLink();
+        if (link instanceof Sender) {
+            dlv.settle();
+        } else {
+            Receiver rcv = (Receiver) link;
+            if (!dlv.isPartial()) {
+                byte[] bytes = new byte[dlv.pending()];
+                rcv.recv(bytes, 0, bytes.length);
+                String address = router.getAddress(rcv);
+                Message message = new Message(bytes);
+                messages.put(address, message);
+                dlv.disposition(Accepted.getInstance());
+                dlv.settle();
+                if (!quiet) {
+                    System.out.println(String.format("Got message(%s): %s", 
address, message));
+                }
+                send(address);
+            }
+        }
+    }
+
+    public static final void main(String[] argv) throws IOException {
+        List<String> switches = new ArrayList<String>();
+        List<String> args = new ArrayList<String>();
+        for (String s : argv) {
+            if (s.startsWith("-")) {
+                switches.add(s);
+            } else {
+                args.add(s);
+            }
+        }
+
+        boolean quiet = switches.contains("-q");
+        String host = !args.isEmpty() && 
!Character.isDigit(args.get(0).charAt(0)) ?
+            args.remove(0) : "localhost";
+        int port = !args.isEmpty() ? Integer.parseInt(args.remove(0)) : 5672;
+
+        Collector collector = Collector.Factory.create();
+        Router router = new Router();
+        Driver driver = new Driver(collector, new Handshaker(),
+                                   new FlowController(1024), router,
+                                   new Server(router, quiet));
+        driver.listen(host, port);
+        driver.run();
+    }
+
+}

Added: 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java?rev=1631544&view=auto
==============================================================================
--- 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java
 (added)
+++ 
qpid/proton/trunk/examples/engine/java/src/main/java/org/apache/qpid/proton/examples/Spout.java
 Mon Oct 13 20:51:05 2014
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.examples;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Sender;
+
+public class Spout extends BaseHandler
+{
+    private int count;
+    private int sent;
+    private int settled;
+    private boolean quiet;
+
+    public Spout(int count, boolean quiet) {
+        this.count = count;
+        this.quiet = quiet;
+    }
+
+    @Override
+    public void onLinkFlow(Event evt) {
+        Link link = evt.getLink();
+        if (link instanceof Sender) {
+            Sender sender = (Sender) link;
+            while ((sent < count) && sender.getCredit() > 0) {
+                Delivery dlv = sender.delivery(String.format("spout-%s", 
sent).getBytes());
+
+                Message msg = new Message(String.format("Hello World! [%s]", 
sent));
+                byte[] bytes = msg.getBytes();
+                sender.send(bytes, 0, bytes.length);
+                sender.advance();
+
+                if (!quiet) {
+                    System.out.println(String.format("Sent %s to %s: %s", new 
String(dlv.getTag()),
+                                                     
sender.getTarget().getAddress(), msg));
+                }
+                sent++;
+            }
+        }
+    }
+
+    @Override
+    public void onDelivery(Event evt) {
+        Delivery dlv = evt.getDelivery();
+        if (dlv.remotelySettled()) {
+            if (!quiet) {
+                System.out.println(String.format("Settled %s: %s", new 
String(dlv.getTag()), dlv.getRemoteState()));
+            }
+            dlv.settle();
+            settled++;
+        }
+
+        if (settled >= count) {
+            dlv.getLink().getSession().getConnection().close();
+        }
+    }
+
+    @Override
+    public void onConnectionRemoteClose(Event evt) {
+        System.out.println("settled: " + settled);
+    }
+
+    public static void main(String[] argv) throws Exception {
+        List<String> switches = new ArrayList<String>();
+        List<String> args = new ArrayList<String>();
+        for (String s : argv) {
+            if (s.startsWith("-")) {
+                switches.add(s);
+            } else {
+                args.add(s);
+            }
+        }
+
+        boolean quiet = switches.contains("-q");
+        String address = !args.isEmpty() && args.get(0).startsWith("/") ?
+            args.remove(0) : "//localhost";
+        int count = !args.isEmpty() ? Integer.parseInt(args.remove(0)) : 1;
+
+        Collector collector = Collector.Factory.create();
+
+        Spout spout = new Spout(count, quiet);
+
+        Driver driver = new Driver(collector, spout);
+
+        Pool pool = new Pool(collector);
+        pool.outgoing(address, null);
+
+        driver.run();
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to