PROTON-827: Simplified examples and Connection error handling.

- Simplified non-relevant code in examples (logging, argument handling)
- Improved error handling on API see Connection.Error(). Need more on other 
endpoints.
- Added -debug flag to example_test to help debug example problems.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/495d7413
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/495d7413
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/495d7413

Branch: refs/heads/proton-go
Commit: 495d74136f165f37f692e68710a1b6cdbd74dc98
Parents: 7cecb8e
Author: Alan Conway <acon...@redhat.com>
Authored: Mon May 25 17:14:22 2015 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Mon Sep 28 17:19:56 2015 -0400

----------------------------------------------------------------------
 examples/go/example_test.go                     |  30 ++--
 examples/go/receive.go                          | 140 +++++++------------
 examples/go/send.go                             | 108 ++++++--------
 proton-c/bindings/go/genwrap.go                 | 108 +++++++-------
 .../src/qpid.apache.org/proton/go/event/pump.go |   5 +-
 .../qpid.apache.org/proton/go/internal/error.go |  25 ++--
 .../proton/go/messaging/messaging.go            |  51 ++++---
 7 files changed, 215 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/495d7413/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index a4b4c2c..8879c38 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -78,13 +78,11 @@ func (b *broker) check() error {
 
 // Start the demo broker, wait till it is listening on *addr. No-op if already 
started.
 func (b *broker) start() error {
-       build("event_broker.go")
        if b.cmd == nil { // Not already started
                // FIXME aconway 2015-04-30: better way to pick/configure a 
broker port.
                b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000)
-               b.cmd = exec.Command(exepath("event_broker"), "-addr", b.addr, 
"-verbose", "0")
+               b.cmd = exampleCommand("event_broker", "-addr", b.addr)
                b.runerr = make(chan error)
-               // Change the -verbose setting above to see broker output on 
stdout/stderr.
                b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout
                go func() {
                        b.runerr <- b.cmd.Run()
@@ -111,7 +109,12 @@ func checkEqual(want interface{}, got interface{}) error {
 // runCommand returns an exec.Cmd to run an example.
 func exampleCommand(prog string, arg ...string) *exec.Cmd {
        build(prog + ".go")
-       cmd := exec.Command(exepath(prog), arg...)
+       args := []string{}
+       if *debug {
+               args = append(args, "-debug=true")
+       }
+       args = append(args, arg...)
+       cmd := exec.Command(exepath(prog), args...)
        cmd.Stderr = os.Stderr
        return cmd
 }
@@ -149,16 +152,16 @@ func TestExampleSendReceive(t *testing.T) {
        }
        testBroker.start()
        err := runExampleWant(
-               "send: Received all 15 acknowledgements\n",
+               "Received all 15 acknowledgements\n",
                "send",
-               exampleArgs("-count", "5", "-verbose", "1")...)
+               exampleArgs("-count", "5")...)
        if err != nil {
                t.Fatal(err)
        }
        err = runExampleWant(
-               "receive: Listening\nreceive: Received 15 messages\n",
+               "Listening on 3 connections\nReceived 15 messages\n",
                "receive",
-               exampleArgs("-verbose", "1", "-count", "15")...)
+               exampleArgs("-count", "15")...)
        if err != nil {
                t.Fatal(err)
        }
@@ -187,7 +190,7 @@ func goReceiveWant(errchan chan<- error, want string, arg 
...string) *exec.Cmd {
                        errchan <- err
                        return
                }
-               listening := "receive: Listening\n"
+               listening := "Listening on 3 connections\n"
                if line != listening {
                        errchan <- checkEqual(listening, line)
                        return
@@ -209,8 +212,8 @@ func TestExampleReceiveSend(t *testing.T) {
        testBroker.start()
        recvErr := make(chan error)
        recvCmd := goReceiveWant(recvErr,
-               "receive: Received 15 messages\n",
-               exampleArgs("-count", "15", "-verbose", "1")...)
+               "Received 15 messages\n",
+               exampleArgs("-count", "15")...)
        defer func() {
                recvCmd.Process.Kill()
                recvCmd.Wait()
@@ -219,9 +222,9 @@ func TestExampleReceiveSend(t *testing.T) {
                t.Fatal(err)
        }
        err := runExampleWant(
-               "send: Received all 15 acknowledgements\n",
+               "Received all 15 acknowledgements\n",
                "send",
-               exampleArgs("-count", "5", "-verbose", "1")...)
+               exampleArgs("-count", "5")...)
        if err != nil {
                t.Fatal(err)
        }
@@ -263,6 +266,7 @@ func build(prog string) {
 }
 
 var rpath = flag.String("rpath", "", "Runtime path for test executables")
+var debug = flag.Bool("debug", false, "Debugging output from examples")
 
 func TestMain(m *testing.M) {
        rand.Seed(time.Now().UTC().UnixNano())

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/495d7413/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
index e31862b..b1eb309 100644
--- a/examples/go/receive.go
+++ b/examples/go/receive.go
@@ -22,87 +22,65 @@ package main
 import (
        "flag"
        "fmt"
-       "io"
-       "io/ioutil"
-       "log"
-       "math"
        "net"
        "os"
-       "path"
        "qpid.apache.org/proton/go/amqp"
        "qpid.apache.org/proton/go/messaging"
        "sync"
-       "time"
 )
 
-// Command-line flags
-var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means 
more")
-var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 
means unlimited.")
-var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means 
unlimited.")
+// Usage and command-line flags
+func usage() {
+       fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
+Receive messages from all the listed URLs concurrently and print them.
+`, os.Args[0])
+       flag.PrintDefaults()
+}
+
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
 var full = flag.Bool("full", false, "Print full message not just body.")
 
 func main() {
-       // Parse flags and arguments, print usage message on error.
-       flag.Usage = func() {
-               fmt.Fprintf(os.Stderr, `
-Usage: %s url [url ...]
-Receive messages from all the listed URLs concurrently and print them.
-`, os.Args[0])
-               flag.PrintDefaults()
-       }
+       flag.Usage = usage
        flag.Parse()
+
        urls := flag.Args() // Non-flag arguments are URLs to receive from
        if len(urls) == 0 {
-               flag.Usage()
-               fmt.Fprintf(os.Stderr, "No URL provided")
+               fmt.Fprintln(os.Stderr, "No URL provided")
+               usage()
                os.Exit(1)
        }
-       duration := time.Duration(*timeout) * time.Second
-       if duration == 0 {
-               duration = time.Duration(math.MaxInt64) // Not forever, but 290 
years is close enough.
-       }
-       if *count == 0 {
-               *count = math.MaxInt64
-       }
-
-       // Create a goroutine for each URL that receives messages and sends 
them to
-       // the messages channel. main() receives and prints them.
 
        messages := make(chan amqp.Message) // Channel for messages from 
goroutines to main()
        stop := make(chan struct{})         // Closing this channel means the 
program is stopping.
+       var wait sync.WaitGroup             // Used by main() to wait for all 
goroutines to end.
+       wait.Add(len(urls))                 // Wait for one goroutine per URL.
 
-       var wait sync.WaitGroup // Used by main() to wait for all goroutines to 
end.
-
-       wait.Add(len(urls)) // Wait for one goroutine per URL.
-
-       // Arrange to close all connections on exit
-       connections := make([]*messaging.Connection, len(urls))
-       defer func() {
-               for _, c := range connections {
-                       if c != nil {
-                               c.Close()
-                       }
-               }
-       }()
+       connections := make([]*messaging.Connection, len(urls)) // Store 
connctions to close on exit
 
+       // Start a goroutine to for each URL to receive messages and send them 
to the messages channel.
+       // main() receives and prints them.
        for i, urlStr := range urls {
-               debug.Printf("Connecting to %s", urlStr)
-               go func(urlStr string) {
-                       defer wait.Done()                 // Notify main() that 
this goroutine is done.
+               debugf("debug: Connecting to %s\n", urlStr)
+               go func(urlStr string) { // Start the goroutine
+
+                       defer wait.Done()                 // Notify main() when 
this goroutine is done.
                        url, err := amqp.ParseURL(urlStr) // Like 
net/url.Parse() but with AMQP defaults.
-                       fatalIf(err)
+                       exitIf(err)
 
                        // Open a standard Go net.Conn and and AMQP connection 
using it.
                        conn, err := net.Dial("tcp", url.Host) // Note 
net.URL.Host is actually "host:port"
-                       fatalIf(err)
+                       exitIf(err)
                        pc, err := messaging.Connect(conn) // This is our AMQP 
connection.
-                       fatalIf(err)
-                       connections[i] = pc // So we can close it when main() 
ends
+                       exitIf(err)
+                       connections[i] = pc // Save connection so it will be 
closed when main() ends
 
                        // Create a receiver using the path of the URL as the 
AMQP address
                        r, err := pc.Receiver(url.Path)
-                       fatalIf(err)
+                       exitIf(err)
 
+                       // Loop receiving messages
                        for {
                                var m amqp.Message
                                select { // Receive a message or stop.
@@ -118,57 +96,43 @@ Receive messages from all the listed URLs concurrently and 
print them.
                        }
                }(urlStr)
        }
-       info.Printf("Listening")
-
-       // time.After() returns a channel that will close when the timeout is 
up.
-       timer := time.After(duration)
-
-       // main() prints each message and checks for count or timeout being 
exceeded.
-       for i := int64(0); i < *count; i++ {
-               select {
-               case m := <-messages:
-                       debug.Print(formatMessage{m})
-               case <-timer: // Timeout has expired
-                       i = 0
-               }
+
+       // All goroutines are started, we are receiving messages.
+       fmt.Printf("Listening on %d connections\n", len(urls))
+
+       // print each message until the count is exceeded.
+       for i := uint64(0); i < *count; i++ {
+               debugf("%s\n", formatMessage(<-messages))
        }
-       info.Printf("Received %d messages", *count)
+       fmt.Printf("Received %d messages\n", *count)
        close(stop) // Signal all goroutines to stop.
        wait.Wait() // Wait for all goroutines to finish.
-}
-
-// Logging
-func logger(prefix string, level int, w io.Writer) *log.Logger {
-       if *verbose >= level {
-               return log.New(w, prefix, 0)
+       close(messages)
+       for _, c := range connections { // Close all connections
+               if c != nil {
+                       c.Close()
+               }
        }
-       return log.New(ioutil.Discard, "", 0)
 }
 
-var info, debug *log.Logger
-
-func init() {
-       flag.Parse()
-       name := path.Base(os.Args[0])
-       log.SetFlags(0)                                               // Use 
default logger for errors.
-       log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log 
errors on stderr.
-       info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log 
info on stdout.
-       debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log 
debug on stderr.
+// Simple debug logging
+func debugf(format string, data ...interface{}) {
+       if *debug {
+               fmt.Fprintf(os.Stderr, format, data...)
+       }
 }
 
 // Simple error handling for demo.
-func fatalIf(err error) {
+func exitIf(err error) {
        if err != nil {
-               log.Fatal(err)
+               fmt.Fprintln(os.Stderr, err)
        }
 }
 
-type formatMessage struct{ m amqp.Message }
-
-func (fm formatMessage) String() string {
+func formatMessage(m amqp.Message) string {
        if *full {
-               return fmt.Sprintf("%#v", fm.m)
+               return fmt.Sprintf("%#v", m)
        } else {
-               return fmt.Sprintf("%#v", fm.m.Body())
+               return fmt.Sprintf("%#v", m.Body())
        }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/495d7413/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
index 4aaeb43..98acefa 100644
--- a/examples/go/send.go
+++ b/examples/go/send.go
@@ -22,21 +22,23 @@ package main
 import (
        "flag"
        "fmt"
-       "io"
-       "io/ioutil"
-       "log"
-       "math"
        "net"
        "os"
-       "path"
        "qpid.apache.org/proton/go/amqp"
        "qpid.apache.org/proton/go/messaging"
        "sync"
 )
 
-// Command-line flags
-var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means 
more")
-var count = flag.Int64("count", 1, "Send this may messages per address. 0 
means unlimited.")
+// Usage and command-line flags
+func usage() {
+       fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
+Send messages to each URL concurrently with body "<url-path>-<n>" where n is 
the message number.
+`, os.Args[0])
+       flag.PrintDefaults()
+}
+
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var count = flag.Int64("count", 1, "Send this may messages per address.")
 
 // Ack associates an info string with an acknowledgement
 type Ack struct {
@@ -45,67 +47,51 @@ type Ack struct {
 }
 
 func main() {
-       // Parse flags and arguments, print usage message on error.
-       flag.Usage = func() {
-               fmt.Fprintf(os.Stderr, `
-Usage: %s url [url ...]
-Send messages to all the listed URLs concurrently.
-To each URL, send the string "path-n" where n is the message number.
-`, os.Args[0])
-               flag.PrintDefaults()
-       }
+       flag.Usage = usage
        flag.Parse()
+
        urls := flag.Args() // Non-flag arguments are URLs to receive from
        if len(urls) == 0 {
+               fmt.Fprintln(os.Stderr, "No URL provided")
                flag.Usage()
-               fmt.Fprintf(os.Stderr, "No URL provided\n")
                os.Exit(1)
        }
-       if *count == 0 {
-               *count = math.MaxInt64
-       }
-
-       // Create a channel to receive all the acknowledgements
-       acks := make(chan Ack)
 
-       // Create a goroutine for each URL that sends messages.
+       acks := make(chan Ack)  // Channel to receive all the acknowledgements
        var wait sync.WaitGroup // Used by main() to wait for all goroutines to 
end.
        wait.Add(len(urls))     // Wait for one goroutine per URL.
 
-       // Arrange to close all connections on exit
-       connections := make([]*messaging.Connection, len(urls))
-       defer func() {
-               for _, c := range connections {
-                       if c != nil {
-                               c.Close()
-                       }
-               }
-       }()
+       connections := make([]*messaging.Connection, len(urls)) // Store 
connctions to close on exit
 
+       // Start a goroutine for each URL to send messages, receive the 
acknowledgements and
+       // send them to the acks channel.
        for i, urlStr := range urls {
-               debug.Printf("Connecting to %v", urlStr)
+               debugf("Connecting to %v\n", urlStr)
                go func(urlStr string) {
+
                        defer wait.Done()                 // Notify main() that 
this goroutine is done.
                        url, err := amqp.ParseURL(urlStr) // Like 
net/url.Parse() but with AMQP defaults.
-                       fatalIf(err)
+                       exitIf(err)
 
                        // Open a standard Go net.Conn and and AMQP connection 
using it.
                        conn, err := net.Dial("tcp", url.Host) // Note 
net.URL.Host is actually "host:port"
-                       fatalIf(err)
+                       exitIf(err)
                        pc, err := messaging.Connect(conn) // This is our AMQP 
connection.
-                       fatalIf(err)
-                       connections[i] = pc // So we can close it when main() 
ends
+                       exitIf(err)
+                       connections[i] = pc // Save connection so it will be 
closed when main() ends
 
                        // Create a sender using the path of the URL as the 
AMQP address
                        s, err := pc.Sender(url.Path)
-                       fatalIf(err)
+                       exitIf(err)
 
+                       // Loop sending messages, receiving acknowledgements 
and sending them to the acks channel.
                        for i := int64(0); i < *count; i++ {
                                m := amqp.NewMessage()
                                body := fmt.Sprintf("%v-%v", url.Path, i)
                                m.SetBody(body)
+                               // Note Send is *asynchronous*, ack is a 
channel that will receive the acknowledgement.
                                ack, err := s.Send(m)
-                               fatalIf(err)
+                               exitIf(err)
                                acks <- Ack{ack, body} // Send the 
acknowledgement to main()
                        }
                }(urlStr)
@@ -113,44 +99,38 @@ To each URL, send the string "path-n" where n is the 
message number.
 
        // Wait for all the acknowledgements
        expect := int(*count) * len(urls)
-       debug.Printf("Started senders, expect %v acknowledgements", expect)
+       debugf("Started senders, expect %v acknowledgements\n", expect)
        for i := 0; i < expect; i++ {
                ack, ok := <-acks
                if !ok {
-                       info.Fatalf("acks channel closed after only %d acks\n", 
i)
+                       exitIf(fmt.Errorf("acks channel closed after only %d 
acks\n", i))
                }
                d := <-ack.ack
-               debug.Printf("acknowledgement[%v] %v", i, ack.info)
+               debugf("acknowledgement[%v] %v\n", i, ack.info)
                if d != messaging.Accepted {
-                       info.Printf("Unexpected disposition %v", d)
+                       fmt.Printf("Unexpected disposition %v\n", d)
                }
        }
-       info.Printf("Received all %v acknowledgements", expect)
-       wait.Wait() // Wait for all goroutines to finish.
-}
+       fmt.Printf("Received all %v acknowledgements\n", expect)
 
-// Logging
-func logger(prefix string, level int, w io.Writer) *log.Logger {
-       if *verbose >= level {
-               return log.New(w, prefix, 0)
+       wait.Wait()                     // Wait for all goroutines to finish.
+       for _, c := range connections { // Close all connections
+               if c != nil {
+                       c.Close()
+               }
        }
-       return log.New(ioutil.Discard, "", 0)
 }
 
-var info, debug *log.Logger
-
-func init() {
-       flag.Parse()
-       name := path.Base(os.Args[0])
-       log.SetFlags(0)                                               // Use 
default logger for errors.
-       log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log 
errors on stderr.
-       info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log 
info on stdout.
-       debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log 
debug on stderr.
+// Simple debug logging
+func debugf(format string, data ...interface{}) {
+       if *debug {
+               fmt.Fprintf(os.Stderr, format, data...)
+       }
 }
 
 // Simple error handling for demo.
-func fatalIf(err error) {
+func exitIf(err error) {
        if err != nil {
-               log.Fatal(err)
+               fmt.Fprintln(os.Stderr, err)
        }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/495d7413/proton-c/bindings/go/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go
index 094b196..27e5966 100644
--- a/proton-c/bindings/go/genwrap.go
+++ b/proton-c/bindings/go/genwrap.go
@@ -35,6 +35,59 @@ import (
        "text/template"
 )
 
+var includeProton = "../../include/proton"
+var outpath = "src/qpid.apache.org/proton/go/event/wrappers_gen.go"
+
+func main() {
+       flag.Parse()
+       out, err := os.Create(outpath)
+       panicIf(err)
+       defer out.Close()
+
+       apis := []string{"session", "link", "delivery", "disposition", 
"condition", "terminus", "connection"}
+       fmt.Fprintln(out, copyright)
+       fmt.Fprint(out, `
+package event
+
+import (
+       "time"
+  "unsafe"
+  "qpid.apache.org/proton/go/internal"
+)
+
+// #include <proton/types.h>
+// #include <proton/event.h>
+// #include <stdlib.h>
+`)
+       for _, api := range apis {
+               fmt.Fprintf(out, "// #include <proton/%s.h>\n", api)
+       }
+       fmt.Fprintln(out, `import "C"`)
+
+       event(out)
+
+       for _, api := range apis {
+               fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", 
api)
+               header := readHeader(api)
+               enums := findEnums(header)
+               for _, e := range enums {
+                       genEnum(out, e.Name, e.Values)
+               }
+               apiWrapFns(api, header, out)
+       }
+       out.Close()
+
+       // Run gofmt.
+       cmd := exec.Command("gofmt", "-w", outpath)
+       cmd.Stdout = os.Stdout
+       cmd.Stderr = os.Stderr
+       err = cmd.Run()
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "gofmt: %s", err)
+               os.Exit(1)
+       }
+}
+
 func mixedCase(s string) string {
        result := ""
        for _, w := range strings.Split(s, "_") {
@@ -96,7 +149,7 @@ func panicIf(err error) {
 }
 
 func readHeader(name string) string {
-       file, err := os.Open(path.Join(*includeProton, name+".h"))
+       file, err := os.Open(path.Join(includeProton, name+".h"))
        panicIf(err)
        defer file.Close()
        s, err := ioutil.ReadAll(file)
@@ -372,56 +425,3 @@ func apiWrapFns(api, header string, out io.Writer) {
                fmt.Fprintf(out, "}\n")
        }
 }
-
-var includeProton = flag.String("include", "", "path to proton include files, 
including /proton")
-
-func main() {
-       flag.Parse()
-       outpath := "wrappers_gen.go"
-       out, err := os.Create(outpath)
-       panicIf(err)
-       defer out.Close()
-
-       apis := []string{"session", "link", "delivery", "disposition", 
"condition", "terminus", "connection"}
-       fmt.Fprintln(out, copyright)
-       fmt.Fprint(out, `
-package event
-
-import (
-       "time"
-  "unsafe"
-  "qpid.apache.org/proton/go/internal"
-)
-
-// #include <proton/types.h>
-// #include <proton/event.h>
-// #include <stdlib.h>
-`)
-       for _, api := range apis {
-               fmt.Fprintf(out, "// #include <proton/%s.h>\n", api)
-       }
-       fmt.Fprintln(out, `import "C"`)
-
-       event(out)
-
-       for _, api := range apis {
-               fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", 
api)
-               header := readHeader(api)
-               enums := findEnums(header)
-               for _, e := range enums {
-                       genEnum(out, e.Name, e.Values)
-               }
-               apiWrapFns(api, header, out)
-       }
-       out.Close()
-
-       // Run gofmt.
-       cmd := exec.Command("gofmt", "-w", outpath)
-       cmd.Stdout = os.Stdout
-       cmd.Stderr = os.Stderr
-       err = cmd.Run()
-       if err != nil {
-               fmt.Fprintf(os.Stderr, "gofmt: %s", err)
-               os.Exit(1)
-       }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/495d7413/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go
index db022ff..73db513 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go
@@ -101,7 +101,7 @@ you are doing something fairly low-level it is probably a 
better choice.
 */
 type Pump struct {
        // Error is set on exit from Run() if there was an error.
-       Error error
+       Error error // FIXME aconway 2015-05-26: make it a function
        // Channel to inject functions to be executed in the Pump's proton 
event loop.
        Inject chan func()
 
@@ -212,6 +212,9 @@ func (p *Pump) Close() error {
        }
        delete(pumps, p.connection)
        p.free()
+       if p.Error == io.EOF {
+               return nil
+       }
        return p.Error
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/495d7413/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go
index 01ba890..f3f3307 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go
@@ -29,7 +29,6 @@ import (
        "fmt"
        "runtime"
        "sync"
-       "sync/atomic"
        "unsafe"
 )
 
@@ -105,21 +104,23 @@ func panicIf(condition bool, fmt string, args 
...interface{}) {
 
 // FirstError is a goroutine-safe error holder that keeps the first error that 
is set.
 type FirstError struct {
-       err  atomic.Value
-       once sync.Once
+       err  error
+       lock sync.Mutex
 }
 
-// Set the error if not allread set.
-func (e *FirstError) Set(err error) {
-       e.once.Do(func() { e.err.Store(err) })
+// Set the error if not already set, return the error.
+func (e *FirstError) Set(err error) error {
+       e.lock.Lock()
+       defer e.lock.Unlock()
+       if e.err == nil {
+               e.err = err
+       }
+       return e.err
 }
 
 // Get the error.
 func (e *FirstError) Get() error {
-       v := e.err.Load()
-       if v != nil {
-               return v.(error)
-       } else {
-               return nil
-       }
+       e.lock.Lock()
+       defer e.lock.Unlock()
+       return e.err
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/495d7413/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go
index e653de2..e4b117d 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go
@@ -23,11 +23,16 @@ package messaging
 import "C"
 
 import (
+       "io"
        "net"
        "qpid.apache.org/proton/go/amqp"
        "qpid.apache.org/proton/go/event"
+       "qpid.apache.org/proton/go/internal"
 )
 
+// Closed is an alias for io.EOF. It indicates orderly closure of an endpoint.
+var Closed = io.EOF
+
 // Connection is a connection to a remote AMQP endpoint.
 //
 // You can set exported fields to configure the connection before calling
@@ -42,27 +47,27 @@ type Connection struct {
        handler *handler
        pump    *event.Pump
        session Session
+       err     internal.FirstError
 }
 
+// Error returns nil if the connection is open, messaging.Closed if was closed 
cleanly
+// or an error value if it was closed due to an error.
+func (c *Connection) Error() error { return c.err.Get() }
+
 // Make an AMQP connection over a net.Conn connection.
-//
-// Use Connection.Close() to close the Connection, this will also close conn.
-// Using conn.Close() directly will cause an abrupt disconnect rather than an
-// orderly AMQP close.
-//
+// You must call c.Close() to close the connection and clean up its resources.
 func (c *Connection) Open(conn net.Conn) (err error) {
        c.handler = newHandler(c)
        c.pump, err = event.NewPump(conn,
                event.NewMessagingDelegator(c.handler),
        )
-       if err != nil {
-               return err
-       }
-       if c.Server {
-               c.pump.Server()
+       if err == nil {
+               if c.Server {
+                       c.pump.Server()
+               }
+               go c.pump.Run()
        }
-       go c.pump.Run()
-       return nil
+       return c.err.Set(err)
 }
 
 // Connect opens a default client connection. It is a shortcut for
@@ -71,14 +76,16 @@ func (c *Connection) Open(conn net.Conn) (err error) {
 //
 func Connect(conn net.Conn) (*Connection, error) {
        c := &Connection{}
-       err := c.Open(conn)
-       return c, err
+       c.err.Set(c.Open(conn))
+       return c, c.Error()
 }
 
-// Close the connection.
-//
-// Connections must be closed to clean up resources and stop associated 
goroutines.
-func (c *Connection) Close() error { return c.pump.Close() }
+// Close cleans up resources and closes the associated net.Conn connection.
+func (c *Connection) Close() error {
+       err := c.pump.Close()   // Will be nil on close OK
+       c.err.Set(c.pump.Error) // Will be io.EOF on close OK
+       return err
+}
 
 // DefaultSession returns a default session for the connection.
 //
@@ -86,6 +93,9 @@ func (c *Connection) Close() error { return c.pump.Close() }
 // Use Session() for more control over creating sessions.
 //
 func (c *Connection) DefaultSession() (s Session, err error) {
+       if c.Error() != nil {
+               return Session{}, c.Error()
+       }
        if c.session.e.IsNil() {
                c.session, err = c.Session()
        }
@@ -237,14 +247,15 @@ func (s *Sender) Send(m amqp.Message) (ack 
Acknowledgement, err error) {
 // Close the sender.
 func (s *Sender) Close() error { return nil } // FIXME aconway 2015-04-27: 
close/free
 
-// Receiver receives messages via the channel Receive.
 type Receiver struct {
        Link
-       // Channel of messag
+       // Channel to receive messages. When it closes, check Receiver.Error() 
for an error.
        Receive <-chan amqp.Message
 }
 
 // FIXME aconway 2015-04-29: settlement - ReceivedMessage with Settle() method?
 
+// FIXME aconway 2015-05-25:  Close must unblock Receive() calls.
+
 // Close the Receiver.
 func (r *Receiver) Close() error { return nil } // FIXME aconway 2015-04-29: 
close/free


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to