NO-JIRA: [go] Improved client/server example, minor doc updates

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/97815c34
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/97815c34
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/97815c34

Branch: refs/heads/go1
Commit: 97815c342c68c4ee2f9919ccd20e8493296b1743
Parents: dbda49b
Author: Alan Conway <acon...@redhat.com>
Authored: Wed Oct 18 16:58:47 2017 +0100
Committer: Alan Conway <acon...@redhat.com>
Committed: Thu Oct 19 13:37:27 2017 +0100

----------------------------------------------------------------------
 examples/go/electron/receive.go                 |  2 +-
 examples/go/electron/send.go                    |  2 +-
 .../src/qpid.apache.org/electron/connection.go  | 24 ++++--
 .../src/qpid.apache.org/electron/container.go   |  3 +-
 .../electron/ex_client_server_test.go           | 81 -------------------
 .../electron/example_client_server_test.go      | 85 ++++++++++++++++++++
 6 files changed, 105 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index 93046b3..568fcc9 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -74,7 +74,7 @@ func main() {
                        defer wait.Done() // Notify main() when this goroutine 
is done.
                        url, err := amqp.ParseURL(urlStr)
                        fatalIf(err)
-                       c, err := container.Dial("tcp", url.Host)
+                       c, err := container.Dial("tcp", url.Host) // NOTE: Dial 
takes just the Host part of the URL
                        fatalIf(err)
                        connections <- c // Save connection so we can Close() 
when main() ends
                        addr := strings.TrimPrefix(url.Path, "/")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index f96400b..ac4e367 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -73,7 +73,7 @@ func main() {
                        defer wait.Done() // Notify main() when this goroutine 
is done.
                        url, err := amqp.ParseURL(urlStr)
                        fatalIf(err)
-                       c, err := container.Dial("tcp", url.Host)
+                       c, err := container.Dial("tcp", url.Host) // NOTE: Dial 
takes just the Host part of the URL
                        fatalIf(err)
                        connections <- c // Save connection so we can Close() 
when main() ends
                        addr := strings.TrimPrefix(url.Path, "/")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 8f62491..267ee1e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -83,11 +83,17 @@ type Connection interface {
        WaitTimeout(time.Duration) error
 
        // Incoming returns a channel for incoming endpoints opened by the 
remote peer.
-       // See the Incoming interface for more.
+       // See the Incoming interface for more detail.
        //
-       // Not receiving from Incoming() and calling Accept/Reject will block 
the
-       // electron event loop. You should run a loop to handle the types that
-       // interest you in a switch{} and and Accept() all others.
+       // Note: this channel will first return an *IncomingConnection for the
+       // connection itself which allows you to look at security information 
and
+       // decide whether to Accept() or Reject() the connection. Then it will 
return
+       // *IncomingSession, *IncomingSender and *IncomingReceiver as they are 
opened
+       // by the remote end.
+       //
+       // Note 2: you must receiving from Incoming() and call Accept/Reject to 
avoid
+       // blocking electron event loop. Normally you would run a loop in a 
goroutine
+       // to handle incoming types that interest and Accept() those that don't.
        Incoming() <-chan Incoming
 }
 
@@ -387,8 +393,9 @@ func globalSASLInit(eng *proton.Engine) {
 }
 
 // Dial is shorthand for using net.Dial() then NewConnection()
-func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err 
error) {
-       conn, err := net.Dial(network, addr)
+// See net.Dial() for the meaning of the network, address arguments.
+func Dial(network, address string, opts ...ConnectionOption) (c Connection, 
err error) {
+       conn, err := net.Dial(network, address)
        if err == nil {
                c, err = NewConnection(conn, opts...)
        }
@@ -396,8 +403,9 @@ func Dial(network, addr string, opts ...ConnectionOption) 
(c Connection, err err
 }
 
 // DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
-func DialWithDialer(dialer *net.Dialer, network, addr string, opts 
...ConnectionOption) (c Connection, err error) {
-       conn, err := dialer.Dial(network, addr)
+// See net.Dial() for the meaning of the network, address arguments.
+func DialWithDialer(dialer *net.Dialer, network, address string, opts 
...ConnectionOption) (c Connection, err error) {
+       conn, err := dialer.Dial(network, address)
        if err == nil {
                c, err = NewConnection(conn, opts...)
        }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
index efb24ff..7c19aa5 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -43,7 +43,8 @@ type Container interface {
 
        // Dial is shorthand for
        //     conn, err := net.Dial(); c, err := Connection(conn, opts...)
-       Dial(network string, addr string, opts ...ConnectionOption) 
(Connection, error)
+       // See net.Dial() for the meaning of the network, address arguments.
+       Dial(network string, address string, opts ...ConnectionOption) 
(Connection, error)
 
        // Accept is shorthand for:
        //     conn, err := l.Accept(); c, err := Connection(conn, append(opts, 
Server()...)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
deleted file mode 100644
index 93f275b..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package electron_test
-
-import (
-       "fmt"
-       "net"
-       "qpid.apache.org/amqp"
-       "qpid.apache.org/electron"
-)
-
-//  Print errors
-func check(msg string, err error) bool {
-       if err != nil {
-               fmt.Printf("%s: %s\n", msg, err)
-       }
-       return err == nil
-}
-
-func runServer(cont electron.Container, l net.Listener) {
-       for c, err := cont.Accept(l); check("accept connection", err); c, err = 
cont.Accept(l) {
-               go func() { // Process connections concurrently, accepting AMQP 
endpoints
-                       for in := range c.Incoming() {
-                               ep := in.Accept() // Accept all endpoints
-                               go func() {       // Process endpoints 
concurrently
-                                       switch ep := ep.(type) {
-                                       case electron.Sender:
-                                               m := amqp.NewMessageWith("hello 
yourself")
-                                               fmt.Printf("server %q sending 
%q\n", ep.Source(), m.Body())
-                                               ep.SendForget(m) // One-way 
send, client does not need to Accept.
-                                       case electron.Receiver:
-                                               if rm, err := ep.Receive(); 
check("server receive", err) {
-                                                       fmt.Printf("server %q 
received %q\n", ep.Target(), rm.Message.Body())
-                                                       err := rm.Accept() // 
Client is waiting for Accept.
-                                                       check("accept message", 
err)
-                                               }
-                                       }
-                               }()
-                       }
-               }()
-       }
-}
-
-func startServer() (addr net.Addr) {
-       cont := electron.NewContainer("server")
-       if l, err := net.Listen("tcp", ""); check("listen", err) {
-               addr = l.Addr()
-               go runServer(cont, l)
-       }
-       return addr
-}
-
-// Connect to addr and send/receive a message.
-func client(addr net.Addr) {
-       if c, err := electron.Dial(addr.Network(), addr.String()); 
check("dial", err) {
-               defer c.Close(nil)
-               if s, err := c.Sender(electron.Target("target")); 
check("sender", err) {
-                       fmt.Printf("client sending\n")
-                       s.SendSync(amqp.NewMessageWith("hello")) // Send and 
wait for server to Accept()
-               }
-               if r, err := c.Receiver(electron.Source("source")); 
check("receiver", err) {
-                       if rm, err := r.Receive(); err == nil {
-                               fmt.Printf("client received %q\n", 
rm.Message.Body())
-                       }
-               }
-       }
-}
-
-// Example client and server communicating via AMQP over a TCP/IP connection.
-//
-// Normally client and server would be separate processes.
-// For more realistic examples:
-//     https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
-//
-func Example_clientServer() {
-       addr := startServer()
-       client(addr)
-       // Output:
-       // client sending
-       // server "target" received "hello"
-       // server "source" sending "hello yourself"
-       // client received "hello yourself"
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
 
b/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
new file mode 100644
index 0000000..3aa5892
--- /dev/null
+++ 
b/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
@@ -0,0 +1,85 @@
+package electron_test
+
+import (
+       "fmt"
+       "net"
+       "qpid.apache.org/amqp"
+       "qpid.apache.org/electron"
+       "sync"
+)
+
+// Example Server that accepts a single Connection, Session and Receiver link
+// and prints messages received until the link closes.
+func Server(l net.Listener) {
+       cont := electron.NewContainer("server")
+       c, _ := cont.Accept(l) // Ignoring error handling
+       l.Close()              // This server only accepts one connection
+       // Process incoming endpoints till we get a Receiver link
+       var r electron.Receiver
+       for r == nil {
+               in := <-c.Incoming()
+               switch in := in.(type) {
+               case *electron.IncomingSession, *electron.IncomingConnection:
+                       in.Accept() // Accept the incoming connection and 
session for the receiver
+               case *electron.IncomingReceiver:
+                       in.SetCapacity(10)
+                       in.SetPrefetch(true) // Automatic flow control for a 
buffer of 10 messages.
+                       r = in.Accept().(electron.Receiver)
+               case nil:
+                       return // Connection is closed
+               default:
+                       in.Reject(amqp.Errorf("example-server", "unexpected 
endpoint %v", in))
+               }
+       }
+       go func() { // Reject any further incoming endpoints
+               for in := range c.Incoming() {
+                       in.Reject(amqp.Errorf("example-server", "unexpected 
endpoint %v", in))
+               }
+       }()
+       // Receive messages till the Receiver closes
+       rm, err := r.Receive()
+       for ; err == nil; rm, err = r.Receive() {
+               fmt.Printf("server received: %q\n", rm.Message.Body())
+               rm.Accept() // Signal to the client that the message was 
accepted
+       }
+       fmt.Printf("server receiver closed: %v\n", err)
+}
+
+// Example client sending messages to a server running in a goroutine.
+//
+// Normally client and server would be separate processes. For more realistic 
and detailed examples:
+//     https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
+//
+func Example_clientServer() {
+       // NOTE: We ignoring error handling in this example
+       l, _ := net.Listen("tcp", "") // Open a listening port for server, 
client connect to this port
+
+       // SERVER: start the server running in a separate goroutine
+       var waitServer sync.WaitGroup // We will wait for the server goroutine 
to finish before exiting
+       waitServer.Add(1)
+       go func() { // Run the server in the background
+               defer waitServer.Done()
+               Server(l)
+       }()
+
+       // CLIENT: Send messages to the server
+       addr := l.Addr()
+       c, _ := electron.Dial(addr.Network(), addr.String())
+       s, _ := c.Sender()
+       for i := 0; i < 3; i++ {
+               msg := fmt.Sprintf("hello %v", i)
+               // Send and wait for the Outcome from the server.
+               // Note: For higher throughput, use SendAsync() to send a 
stream of messages
+               // and process the returning stream of Outcomes concurrently.
+               s.SendSync(amqp.NewMessageWith(msg))
+       }
+       c.Close(nil) // Closing the connection will stop the server
+
+       waitServer.Wait() // Let the server finish
+
+       // Output:
+       // server received: "hello 0"
+       // server received: "hello 1"
+       // server received: "hello 2"
+       // server receiver closed: EOF
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to