NO-JIRA: [go] Improved client/server example, minor doc updates
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/97815c34 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/97815c34 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/97815c34 Branch: refs/heads/go1 Commit: 97815c342c68c4ee2f9919ccd20e8493296b1743 Parents: dbda49b Author: Alan Conway <acon...@redhat.com> Authored: Wed Oct 18 16:58:47 2017 +0100 Committer: Alan Conway <acon...@redhat.com> Committed: Thu Oct 19 13:37:27 2017 +0100 ---------------------------------------------------------------------- examples/go/electron/receive.go | 2 +- examples/go/electron/send.go | 2 +- .../src/qpid.apache.org/electron/connection.go | 24 ++++-- .../src/qpid.apache.org/electron/container.go | 3 +- .../electron/ex_client_server_test.go | 81 ------------------- .../electron/example_client_server_test.go | 85 ++++++++++++++++++++ 6 files changed, 105 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/examples/go/electron/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go index 93046b3..568fcc9 100644 --- a/examples/go/electron/receive.go +++ b/examples/go/electron/receive.go @@ -74,7 +74,7 @@ func main() { defer wait.Done() // Notify main() when this goroutine is done. url, err := amqp.ParseURL(urlStr) fatalIf(err) - c, err := container.Dial("tcp", url.Host) + c, err := container.Dial("tcp", url.Host) // NOTE: Dial takes just the Host part of the URL fatalIf(err) connections <- c // Save connection so we can Close() when main() ends addr := strings.TrimPrefix(url.Path, "/") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/examples/go/electron/send.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go index f96400b..ac4e367 100644 --- a/examples/go/electron/send.go +++ b/examples/go/electron/send.go @@ -73,7 +73,7 @@ func main() { defer wait.Done() // Notify main() when this goroutine is done. url, err := amqp.ParseURL(urlStr) fatalIf(err) - c, err := container.Dial("tcp", url.Host) + c, err := container.Dial("tcp", url.Host) // NOTE: Dial takes just the Host part of the URL fatalIf(err) connections <- c // Save connection so we can Close() when main() ends addr := strings.TrimPrefix(url.Path, "/") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go index 8f62491..267ee1e 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go @@ -83,11 +83,17 @@ type Connection interface { WaitTimeout(time.Duration) error // Incoming returns a channel for incoming endpoints opened by the remote peer. - // See the Incoming interface for more. + // See the Incoming interface for more detail. // - // Not receiving from Incoming() and calling Accept/Reject will block the - // electron event loop. You should run a loop to handle the types that - // interest you in a switch{} and and Accept() all others. + // Note: this channel will first return an *IncomingConnection for the + // connection itself which allows you to look at security information and + // decide whether to Accept() or Reject() the connection. Then it will return + // *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened + // by the remote end. + // + // Note 2: you must receiving from Incoming() and call Accept/Reject to avoid + // blocking electron event loop. Normally you would run a loop in a goroutine + // to handle incoming types that interest and Accept() those that don't. Incoming() <-chan Incoming } @@ -387,8 +393,9 @@ func globalSASLInit(eng *proton.Engine) { } // Dial is shorthand for using net.Dial() then NewConnection() -func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) { - conn, err := net.Dial(network, addr) +// See net.Dial() for the meaning of the network, address arguments. +func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) { + conn, err := net.Dial(network, address) if err == nil { c, err = NewConnection(conn, opts...) } @@ -396,8 +403,9 @@ func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err err } // DialWithDialer is shorthand for using dialer.Dial() then NewConnection() -func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) { - conn, err := dialer.Dial(network, addr) +// See net.Dial() for the meaning of the network, address arguments. +func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error) { + conn, err := dialer.Dial(network, address) if err == nil { c, err = NewConnection(conn, opts...) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/container.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go index efb24ff..7c19aa5 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go @@ -43,7 +43,8 @@ type Container interface { // Dial is shorthand for // conn, err := net.Dial(); c, err := Connection(conn, opts...) - Dial(network string, addr string, opts ...ConnectionOption) (Connection, error) + // See net.Dial() for the meaning of the network, address arguments. + Dial(network string, address string, opts ...ConnectionOption) (Connection, error) // Accept is shorthand for: // conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go deleted file mode 100644 index 93f275b..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package electron_test - -import ( - "fmt" - "net" - "qpid.apache.org/amqp" - "qpid.apache.org/electron" -) - -// Print errors -func check(msg string, err error) bool { - if err != nil { - fmt.Printf("%s: %s\n", msg, err) - } - return err == nil -} - -func runServer(cont electron.Container, l net.Listener) { - for c, err := cont.Accept(l); check("accept connection", err); c, err = cont.Accept(l) { - go func() { // Process connections concurrently, accepting AMQP endpoints - for in := range c.Incoming() { - ep := in.Accept() // Accept all endpoints - go func() { // Process endpoints concurrently - switch ep := ep.(type) { - case electron.Sender: - m := amqp.NewMessageWith("hello yourself") - fmt.Printf("server %q sending %q\n", ep.Source(), m.Body()) - ep.SendForget(m) // One-way send, client does not need to Accept. - case electron.Receiver: - if rm, err := ep.Receive(); check("server receive", err) { - fmt.Printf("server %q received %q\n", ep.Target(), rm.Message.Body()) - err := rm.Accept() // Client is waiting for Accept. - check("accept message", err) - } - } - }() - } - }() - } -} - -func startServer() (addr net.Addr) { - cont := electron.NewContainer("server") - if l, err := net.Listen("tcp", ""); check("listen", err) { - addr = l.Addr() - go runServer(cont, l) - } - return addr -} - -// Connect to addr and send/receive a message. -func client(addr net.Addr) { - if c, err := electron.Dial(addr.Network(), addr.String()); check("dial", err) { - defer c.Close(nil) - if s, err := c.Sender(electron.Target("target")); check("sender", err) { - fmt.Printf("client sending\n") - s.SendSync(amqp.NewMessageWith("hello")) // Send and wait for server to Accept() - } - if r, err := c.Receiver(electron.Source("source")); check("receiver", err) { - if rm, err := r.Receive(); err == nil { - fmt.Printf("client received %q\n", rm.Message.Body()) - } - } - } -} - -// Example client and server communicating via AMQP over a TCP/IP connection. -// -// Normally client and server would be separate processes. -// For more realistic examples: -// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md -// -func Example_clientServer() { - addr := startServer() - client(addr) - // Output: - // client sending - // server "target" received "hello" - // server "source" sending "hello yourself" - // client received "hello yourself" -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go new file mode 100644 index 0000000..3aa5892 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go @@ -0,0 +1,85 @@ +package electron_test + +import ( + "fmt" + "net" + "qpid.apache.org/amqp" + "qpid.apache.org/electron" + "sync" +) + +// Example Server that accepts a single Connection, Session and Receiver link +// and prints messages received until the link closes. +func Server(l net.Listener) { + cont := electron.NewContainer("server") + c, _ := cont.Accept(l) // Ignoring error handling + l.Close() // This server only accepts one connection + // Process incoming endpoints till we get a Receiver link + var r electron.Receiver + for r == nil { + in := <-c.Incoming() + switch in := in.(type) { + case *electron.IncomingSession, *electron.IncomingConnection: + in.Accept() // Accept the incoming connection and session for the receiver + case *electron.IncomingReceiver: + in.SetCapacity(10) + in.SetPrefetch(true) // Automatic flow control for a buffer of 10 messages. + r = in.Accept().(electron.Receiver) + case nil: + return // Connection is closed + default: + in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in)) + } + } + go func() { // Reject any further incoming endpoints + for in := range c.Incoming() { + in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in)) + } + }() + // Receive messages till the Receiver closes + rm, err := r.Receive() + for ; err == nil; rm, err = r.Receive() { + fmt.Printf("server received: %q\n", rm.Message.Body()) + rm.Accept() // Signal to the client that the message was accepted + } + fmt.Printf("server receiver closed: %v\n", err) +} + +// Example client sending messages to a server running in a goroutine. +// +// Normally client and server would be separate processes. For more realistic and detailed examples: +// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md +// +func Example_clientServer() { + // NOTE: We ignoring error handling in this example + l, _ := net.Listen("tcp", "") // Open a listening port for server, client connect to this port + + // SERVER: start the server running in a separate goroutine + var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting + waitServer.Add(1) + go func() { // Run the server in the background + defer waitServer.Done() + Server(l) + }() + + // CLIENT: Send messages to the server + addr := l.Addr() + c, _ := electron.Dial(addr.Network(), addr.String()) + s, _ := c.Sender() + for i := 0; i < 3; i++ { + msg := fmt.Sprintf("hello %v", i) + // Send and wait for the Outcome from the server. + // Note: For higher throughput, use SendAsync() to send a stream of messages + // and process the returning stream of Outcomes concurrently. + s.SendSync(amqp.NewMessageWith(msg)) + } + c.Close(nil) // Closing the connection will stop the server + + waitServer.Wait() // Let the server finish + + // Output: + // server received: "hello 0" + // server received: "hello 1" + // server received: "hello 2" + // server receiver closed: EOF +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org