PROTON-1910: [go] test refactor and benchmarks

- Simplify commmon test tools, move to commmon_test.go
- Use net.Pipe for most tests, more efficient than a full network socket
- Added simple benchmarks


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9b4b989
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9b4b989
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9b4b989

Branch: refs/heads/master
Commit: d9b4b9893412676e947d693a7d4e336fad4d4110
Parents: 486fbaf
Author: Alan Conway <acon...@redhat.com>
Authored: Fri Sep 21 00:51:45 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Thu Oct 11 12:32:59 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/message_test.go     | 138 ++++++--
 go/src/qpid.apache.org/electron/auth_test.go    |  39 +--
 .../qpid.apache.org/electron/benchmark_test.go  | 132 ++++++++
 go/src/qpid.apache.org/electron/common_test.go  | 148 ++++++++
 .../qpid.apache.org/electron/electron_test.go   | 337 ++++---------------
 5 files changed, 459 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go 
b/go/src/qpid.apache.org/amqp/message_test.go
index 663e82f..b22c60d 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -20,6 +20,7 @@ under the License.
 package amqp
 
 import (
+       "reflect"
        "testing"
        "time"
 )
@@ -28,9 +29,7 @@ func roundTrip(m Message) error {
        var err error
        if buffer, err := m.Encode(nil); err == nil {
                if m2, err := DecodeMessage(buffer); err == nil {
-                       if err = checkEqual(m, m2); err == nil {
-                               err = checkEqual(m.String(), m2.String())
-                       }
+                       err = checkEqual(m, m2)
                }
        }
        return err
@@ -38,35 +37,40 @@ func roundTrip(m Message) error {
 
 func TestDefaultMessage(t *testing.T) {
        m := NewMessage()
+       mv := reflect.ValueOf(m)
        // Check defaults
-       for _, data := range [][]interface{}{
-               {m.Inferred(), false},
-               {m.Durable(), false},
-               {m.Priority(), uint8(4)},
-               {m.TTL(), time.Duration(0)},
-               {m.UserId(), ""},
-               {m.Address(), ""},
-               {m.Subject(), ""},
-               {m.ReplyTo(), ""},
-               {m.ContentType(), ""},
-               {m.ContentEncoding(), ""},
-               {m.GroupId(), ""},
-               {m.GroupSequence(), int32(0)},
-               {m.ReplyToGroupId(), ""},
-               {m.MessageId(), nil},
-               {m.CorrelationId(), nil},
-               {m.DeliveryAnnotations(), map[AnnotationKey]interface{}(nil)},
-               {m.MessageAnnotations(), map[AnnotationKey]interface{}(nil)},
-               {m.ApplicationProperties(), map[string]interface{}(nil)},
+       for _, x := range []struct {
+               method string
+               want   interface{}
+       }{
+               {"Inferred", false},
+               {"Durable", false},
+               {"Priority", uint8(4)},
+               {"TTL", time.Duration(0)},
+               {"UserId", ""},
+               {"Address", ""},
+               {"Subject", ""},
+               {"ReplyTo", ""},
+               {"ContentType", ""},
+               {"ContentEncoding", ""},
+               {"GroupId", ""},
+               {"GroupSequence", int32(0)},
+               {"ReplyToGroupId", ""},
+               {"MessageId", nil},
+               {"CorrelationId", nil},
+               {"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
+               {"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
+               {"ApplicationProperties", map[string]interface{}(nil)},
 
                // Deprecated
-               {m.Instructions(), map[string]interface{}(nil)},
-               {m.Annotations(), map[string]interface{}(nil)},
-               {m.Properties(), map[string]interface{}(nil)},
-               {m.Body(), nil},
+               {"Instructions", map[string]interface{}(nil)},
+               {"Annotations", map[string]interface{}(nil)},
+               {"Properties", map[string]interface{}(nil)},
+               {"Body", nil},
        } {
-               if err := checkEqual(data[0], data[1]); err != nil {
-                       t.Error(err)
+               ret := mv.MethodByName(x.method).Call(nil)
+               if err := checkEqual(x.want, ret[0].Interface()); err != nil {
+                       t.Errorf("%s: %s", x.method, err)
                }
        }
        if err := roundTrip(m); err != nil {
@@ -84,14 +88,17 @@ func TestMessageString(t *testing.T) {
        
m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"):
 "foo"})
        
m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"):
 "bar"})
        m.SetApplicationProperties(map[string]interface{}{"int": int32(32)})
+       if err := roundTrip(m); err != nil {
+               t.Error(err)
+       }
        msgstr := `Message{user_id="user", instructions={:instructions="foo"}, 
annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
        if err := checkEqual(msgstr, m.String()); err != nil {
                t.Error(err)
        }
 }
 
-func TestMessageRoundTrip(t *testing.T) {
-       m := NewMessage()
+// Set all message properties
+func setMessageProperties(m Message) Message {
        m.SetInferred(false)
        m.SetDurable(true)
        m.SetPriority(42)
@@ -110,7 +117,25 @@ func TestMessageRoundTrip(t *testing.T) {
        
m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"):
 "foo"})
        
m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"):
 "bar"})
        m.SetApplicationProperties(map[string]interface{}{"int": int32(32), 
"bool": true})
-       m.Marshal("hello")
+       return m
+}
+
+func TestMessageRoundTrip(t *testing.T) {
+       m1 := NewMessage()
+       setMessageProperties(m1)
+       m1.Marshal("hello")
+
+       buffer, err := m1.Encode(nil)
+       if err != nil {
+               t.Fatal(err)
+       }
+       m, err := DecodeMessage(buffer)
+       if err != nil {
+               t.Fatal(err)
+       }
+       if err = checkEqual(m1, m); err != nil {
+               t.Error(err)
+       }
 
        for _, data := range [][]interface{}{
                {m.Inferred(), false},
@@ -215,3 +240,54 @@ func TestMessageBodyTypes(t *testing.T) {
 
        // TODO aconway 2015-09-08: array etc.
 }
+
+// Benchmarks assign to package-scope variables to prevent being optimized out.
+var bmM Message
+var bmBuf []byte
+
+func BenchmarkNewMessageEmpty(b *testing.B) {
+       for n := 0; n < b.N; n++ {
+               bmM = NewMessage()
+       }
+}
+
+func BenchmarkNewMessageString(b *testing.B) {
+       for n := 0; n < b.N; n++ {
+               bmM = NewMessageWith("hello")
+       }
+}
+
+func BenchmarkNewMessageAll(b *testing.B) {
+       for n := 0; n < b.N; n++ {
+               bmM = setMessageProperties(NewMessageWith("hello"))
+       }
+}
+
+func BenchmarkEncode(b *testing.B) {
+       m := setMessageProperties(NewMessageWith("hello"))
+       var buf []byte
+       b.ResetTimer()
+       for n := 0; n < b.N; n++ {
+               buf, err := m.Encode(buf)
+               if err != nil {
+                       b.Fatal(err)
+               }
+               bmBuf = buf
+       }
+}
+
+func BenchmarkDecode(b *testing.B) {
+       var buf []byte
+       buf, err := setMessageProperties(NewMessageWith("hello")).Encode(buf)
+       if err != nil {
+               b.Fatal(err)
+       }
+       m := NewMessage()
+       b.ResetTimer()
+       for n := 0; n < b.N; n++ {
+               if err := m.Decode(buf); err != nil {
+                       b.Fatal(err)
+               }
+               bmM = m
+       }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/auth_test.go 
b/go/src/qpid.apache.org/electron/auth_test.go
index 162b366..30ef401 100644
--- a/go/src/qpid.apache.org/electron/auth_test.go
+++ b/go/src/qpid.apache.org/electron/auth_test.go
@@ -29,57 +29,40 @@ import (
        "testing"
 )
 
-func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts 
[]ConnectionOption) (got connectionSettings, err error) {
-       client, server := newClientServerOpts(t, copts, sopts)
-       defer closeClientServer(client, server)
-
-       go func() {
-               for in := range server.Incoming() {
-                       switch in := in.(type) {
-                       case *IncomingConnection:
-                               got = connectionSettings{user: in.User(), 
virtualHost: in.VirtualHost()}
-                       }
-                       in.Accept()
-               }
-       }()
-
-       err = client.Sync()
-       return
-}
-
 func TestAuthAnonymous(t *testing.T) {
-       got, err := testAuthClientServer(t,
+       p := newPipe(t,
                []ConnectionOption{User("fred"), VirtualHost("vhost"), 
SASLAllowInsecure(true)},
                []ConnectionOption{SASLAllowedMechs("ANONYMOUS"), 
SASLAllowInsecure(true)})
-       fatalIf(t, err)
-       errorIf(t, checkEqual(connectionSettings{user: "anonymous", 
virtualHost: "vhost"}, got))
+       fatalIf(t, p.server.Sync())
+       errorIf(t, checkEqual("anonymous", p.server.User()))
+       errorIf(t, checkEqual("vhost", p.server.VirtualHost()))
 }
 
 func TestAuthPlain(t *testing.T) {
        extendedSASL.startTest(t)
-       got, err := testAuthClientServer(t,
+       p := newPipe(t,
                []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
                []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN")})
-       fatalIf(t, err)
-       errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
+       fatalIf(t, p.server.Sync())
+       errorIf(t, checkEqual("fred@proton", p.server.User()))
 }
 
 func TestAuthBadPass(t *testing.T) {
        extendedSASL.startTest(t)
-       _, err := testAuthClientServer(t,
+       p := newPipe(t,
                []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
                []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN")})
-       if err == nil {
+       if p.server.Sync() == nil {
                t.Error("Expected auth failure for bad pass")
        }
 }
 
 func TestAuthBadUser(t *testing.T) {
        extendedSASL.startTest(t)
-       _, err := testAuthClientServer(t,
+       p := newPipe(t,
                []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
                []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN")})
-       if err == nil {
+       if p.server.Sync() == nil {
                t.Error("Expected auth failure for bad user")
        }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/benchmark_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/benchmark_test.go 
b/go/src/qpid.apache.org/electron/benchmark_test.go
new file mode 100644
index 0000000..ae9d47c
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/benchmark_test.go
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+       "flag"
+       "strings"
+       "sync"
+       "testing"
+
+       "qpid.apache.org/amqp"
+)
+
+// To change capacity use
+//     go test -bench=. -args -capacity 100
+var capacity = flag.Int("capacity", 1000, "Prefetch capacity")
+var bodySize = flag.Int("bodySize", 1000, "Message body size")
+
+type bmCommon struct {
+       b    *testing.B
+       p    *pair
+       s    Sender
+       r    Receiver
+       ack  chan Outcome
+       done sync.WaitGroup
+}
+
+func makeBmCommon(p *pair, waitCount int) bmCommon {
+       bm := bmCommon{p: p, b: p.t.(*testing.B)}
+       bm.p.capacity = *capacity
+       bm.p.prefetch = true
+       bm.s, bm.r = p.sender()
+       bm.ack = make(chan Outcome, *capacity)
+       bm.done.Add(waitCount)
+       bm.b.ResetTimer()
+       return bm
+}
+
+func (bm *bmCommon) receiveAccept() {
+       defer bm.done.Done()
+       for n := 0; n < bm.b.N; n++ {
+               if rm, err := bm.r.Receive(); err != nil {
+                       bm.b.Fatal(err)
+               } else {
+                       fatalIf(bm.b, rm.Accept())
+               }
+       }
+}
+
+func (bm *bmCommon) outcomes() {
+       defer bm.done.Done()
+       for n := 0; n < bm.b.N; n++ {
+               fatalIf(bm.b, (<-bm.ack).Error)
+       }
+}
+
+var emptyMsg = amqp.NewMessage()
+
+func BenchmarkSendForget(b *testing.B) {
+       bm := makeBmCommon(newPipe(b, nil, nil), 1)
+       defer bm.p.close()
+
+       go func() { // Receive, no ack
+               defer bm.done.Done()
+               for n := 0; n < b.N; n++ {
+                       if _, err := bm.r.Receive(); err != nil {
+                               b.Fatal(err)
+                       }
+               }
+       }()
+
+       for n := 0; n < b.N; n++ {
+               bm.s.SendForget(emptyMsg)
+       }
+       bm.done.Wait()
+}
+
+func BenchmarkSendSync(b *testing.B) {
+       bm := makeBmCommon(newPipe(b, nil, nil), 1)
+       defer bm.p.close()
+
+       go bm.receiveAccept()
+       for n := 0; n < b.N; n++ {
+               fatalIf(b, bm.s.SendSync(emptyMsg).Error)
+       }
+       bm.done.Wait()
+}
+
+func BenchmarkSendAsync(b *testing.B) {
+       bm := makeBmCommon(newPipe(b, nil, nil), 2)
+       defer bm.p.close()
+
+       go bm.outcomes()      // Handle outcomes
+       go bm.receiveAccept() // Receive
+       for n := 0; n < b.N; n++ {
+               bm.s.SendAsync(emptyMsg, bm.ack, nil)
+       }
+       bm.done.Wait()
+}
+
+// Create a new message for each send, with body and property.
+func BenchmarkSendAsyncNewMessage(b *testing.B) {
+       body := strings.Repeat("x", *bodySize)
+       bm := makeBmCommon(newPipe(b, nil, nil), 2)
+       defer bm.p.close()
+
+       go bm.outcomes()      // Handle outcomes
+       go bm.receiveAccept() // Receive
+       for n := 0; n < b.N; n++ {
+               msg := amqp.NewMessageWith(body)
+               msg.SetApplicationProperties(map[string]interface{}{"prop": 
"value"})
+               bm.s.SendAsync(msg, bm.ack, nil)
+       }
+       bm.done.Wait()
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go 
b/go/src/qpid.apache.org/electron/common_test.go
new file mode 100644
index 0000000..3aad825
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+       "fmt"
+       "net"
+       "path"
+       "reflect"
+       "runtime"
+       "testing"
+)
+
+func decorate(err error, callDepth int) string {
+       _, file, line, _ := runtime.Caller(callDepth + 1) // annotate with 
location of caller.
+       _, file = path.Split(file)
+       return fmt.Sprintf("\n%s:%d: %v", file, line, err)
+}
+
+func fatalIfN(t testing.TB, err error, callDepth int) {
+       if err != nil {
+               t.Fatal(decorate(err, callDepth+1))
+       }
+}
+
+func fatalIf(t testing.TB, err error) {
+       fatalIfN(t, err, 1)
+}
+
+func errorIf(t testing.TB, err error) {
+       if err != nil {
+               t.Errorf(decorate(err, 1))
+       }
+}
+
+func checkEqual(want interface{}, got interface{}) error {
+       if !reflect.DeepEqual(want, got) {
+               return fmt.Errorf("(%#v != %#v)", want, got)
+       }
+       return nil
+}
+
+// AMQP client/server pair
+type pair struct {
+       t        testing.TB
+       client   Session
+       server   Connection
+       capacity int
+       prefetch bool
+       rchan    chan Receiver
+       schan    chan Sender
+       auth     connectionSettings
+}
+
+func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts 
[]ConnectionOption) *pair {
+       opts := append([]ConnectionOption{Server()}, serverOpts...)
+       sc, _ := NewConnection(srv, opts...)
+       opts = append([]ConnectionOption{}, clientOpts...)
+       cc, _ := NewConnection(cli, opts...)
+       cs, _ := cc.Session()
+       p := &pair{
+               t:        t,
+               client:   cs,
+               server:   sc,
+               capacity: 100,
+               rchan:    make(chan Receiver),
+               schan:    make(chan Sender)}
+
+       go func() {
+               for i := range p.server.Incoming() {
+                       switch i := i.(type) {
+                       case *IncomingReceiver:
+                               if p.capacity > 0 {
+                                       i.SetCapacity(p.capacity)
+                               }
+                               i.SetPrefetch(p.prefetch)
+                               p.rchan <- i.Accept().(Receiver)
+                               break
+                       case *IncomingSender:
+                               p.schan <- i.Accept().(Sender)
+                       default:
+                               i.Accept()
+                       }
+               }
+       }()
+
+       return p
+}
+
+// AMQP pair linked by in-memory pipe
+func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+       cli, srv := net.Pipe()
+       return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+// AMQP pair linked by TCP socket
+func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) 
*pair {
+       l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled
+       fatalIfN(t, err, 1)
+       srvCh := make(chan net.Conn)
+       var srvErr error
+       go func() {
+               var c net.Conn
+               c, srvErr = l.Accept()
+               srvCh <- c
+       }()
+       addr := l.Addr()
+       cli, err := net.Dial(addr.Network(), addr.String())
+       fatalIfN(t, err, 1)
+       srv := <-srvCh
+       fatalIfN(t, srvErr, 1)
+       return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) 
}
+
+// Return a client sender and server receiver
+func (p *pair) sender(opts ...LinkOption) (Sender, Receiver) {
+       snd, err := p.client.Sender(opts...)
+       fatalIfN(p.t, err, 2)
+       rcv := <-p.rchan
+       return snd, rcv
+}
+
+// Return a client receiver and server sender
+func (p *pair) receiver(opts ...LinkOption) (Receiver, Sender) {
+       rcv, err := p.client.Receiver(opts...)
+       fatalIfN(p.t, err, 2)
+       snd := <-p.schan
+       return rcv, snd
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go 
b/go/src/qpid.apache.org/electron/electron_test.go
index 74759f5..c8a51c7 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -21,119 +21,23 @@ package electron
 
 import (
        "fmt"
-       "net"
-       "path"
        "qpid.apache.org/amqp"
-       "reflect"
-       "runtime"
        "testing"
        "time"
 )
 
-func fatalIf(t *testing.T, err error) {
-       if err != nil {
-               _, file, line, ok := runtime.Caller(1) // annotate with 
location of caller.
-               if ok {
-                       _, file = path.Split(file)
-               }
-               t.Fatalf("(from %s:%d) %v", file, line, err)
-       }
-}
-
-func errorIf(t *testing.T, err error) {
-       if err != nil {
-               _, file, line, ok := runtime.Caller(1) // annotate with 
location of caller.
-               if ok {
-                       _, file = path.Split(file)
-               }
-               t.Errorf("(from %s:%d) %v", file, line, err)
-       }
-}
-
-func checkEqual(want interface{}, got interface{}) error {
-       if !reflect.DeepEqual(want, got) {
-               return fmt.Errorf("%#v != %#v", want, got)
-       }
-       return nil
-}
-
-// Start a server, return listening addr and channel for incoming Connections.
-func newServer(t *testing.T, cont Container, opts ...ConnectionOption) 
(net.Addr, <-chan Connection) {
-       listener, err := net.Listen("tcp4", "") // For systems with ipv6 
disabled
-       fatalIf(t, err)
-       addr := listener.Addr()
-       ch := make(chan Connection)
-       go func() {
-               conn, err := listener.Accept()
-               c, err := cont.Connection(conn, 
append([]ConnectionOption{Server()}, opts...)...)
-               fatalIf(t, err)
-               ch <- c
-       }()
-       return addr, ch
-}
-
-// Open a client connection and session, return the session.
-func newClient(t *testing.T, cont Container, addr net.Addr, opts 
...ConnectionOption) Session {
-       conn, err := net.Dial(addr.Network(), addr.String())
-       fatalIf(t, err)
-       // Don't  bother checking error here, it's an async error so it's racy 
to do so anyway.
-       // Let caller use Sync() or catch it on first use.
-       c, _ := cont.Connection(conn, opts...)
-       sn, _ := c.Session()
-       return sn
-}
-
-// Return client and server ends of the same connection.
-func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts 
[]ConnectionOption) (client Session, server Connection) {
-       addr, ch := newServer(t, NewContainer("test-server"), sopts...)
-       client = newClient(t, NewContainer("test-client"), addr, copts...)
-       return client, <-ch
-}
-
-// Return client and server ends of the same connection.
-func newClientServer(t *testing.T) (client Session, server Connection) {
-       return newClientServerOpts(t, nil, nil)
-}
-
-// Close client and server
-func closeClientServer(client Session, server Connection) {
-       client.Connection().Close(nil)
-       server.Close(nil)
-}
-
 // Send a message one way with a client sender and server receiver, verify ack.
-func TestClientSendServerReceive(t *testing.T) {
+func TestClientSender(t *testing.T) {
+       p := newPipe(t, nil, nil)
+       defer func() { p.close() }()
+
        nLinks := 3
        nMessages := 3
 
-       rchan := make(chan Receiver, nLinks)
-       client, server := newClientServer(t)
-       go func() {
-               for in := range server.Incoming() {
-                       switch in := in.(type) {
-                       case *IncomingReceiver:
-                               in.SetCapacity(1)
-                               in.SetPrefetch(false)
-                               rchan <- in.Accept().(Receiver)
-                       default:
-                               in.Accept()
-                       }
-               }
-       }()
-
-       defer func() { closeClientServer(client, server) }()
-
        s := make([]Sender, nLinks)
-       for i := 0; i < nLinks; i++ {
-               var err error
-               s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
-               if err != nil {
-                       t.Fatal(err)
-               }
-       }
        r := make([]Receiver, nLinks)
        for i := 0; i < nLinks; i++ {
-               r[i] = <-rchan
+               s[i], r[i] = p.sender(Target(fmt.Sprintf("foo%d", i)))
        }
 
        for i := 0; i < nLinks; i++ {
@@ -146,16 +50,12 @@ func TestClientSendServerReceive(t *testing.T) {
                                m := 
amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
                                var err error
                                s[i].SendAsync(m, ack, "testing")
-                               if err != nil {
-                                       t.Fatal(err)
-                               }
+                               fatalIf(t, err)
                        }()
 
                        // Server receive
                        rm, err := r[i].Receive()
-                       if err != nil {
-                               t.Fatal(err)
-                       }
+                       fatalIf(t, err)
                        if want, got := interface{}(fmt.Sprintf("foobar%v-%v", 
i, j)), rm.Message.Body(); want != got {
                                t.Errorf("%#v != %#v", want, got)
                        }
@@ -182,76 +82,28 @@ func TestClientSendServerReceive(t *testing.T) {
 
 func TestClientReceiver(t *testing.T) {
        nMessages := 3
-       client, server := newClientServer(t)
+       p := newPipe(t, nil, nil)
+       defer func() { p.close() }()
+       r, s := p.receiver(Source("foo"), Capacity(nMessages), Prefetch(true))
        go func() {
-               for in := range server.Incoming() {
-                       switch in := in.(type) {
-                       case *IncomingSender:
-                               s := in.Accept().(Sender)
-                               go func() {
-                                       for i := int32(0); i < 
int32(nMessages); i++ {
-                                               out := 
s.SendSync(amqp.NewMessageWith(i))
-                                               if out.Error != nil {
-                                                       t.Error(out.Error)
-                                                       return
-                                               }
-                                       }
-                                       s.Close(nil)
-                               }()
-                       default:
-                               in.Accept()
-                       }
+               for i := 0; i < nMessages; i++ { // Server sends
+                       out := s.SendSync(amqp.NewMessageWith(int32(i)))
+                       fatalIf(t, out.Error)
                }
        }()
-
-       r, err := client.Receiver(Source("foo"))
-       if err != nil {
-               t.Fatal(err)
-       }
-       for i := int32(0); i < int32(nMessages); i++ {
+       for i := 0; i < nMessages; i++ { // Client receives
                rm, err := r.Receive()
-               if err != nil {
-                       if err != Closed {
-                               t.Error(err)
-                       }
-                       break
-               }
-               if err := rm.Accept(); err != nil {
-                       t.Error(err)
-               }
-               if b, ok := rm.Message.Body().(int32); !ok || b != i {
-                       t.Errorf("want %v, true got %v, %v", i, b, ok)
-               }
+               fatalIf(t, err)
+               errorIf(t, checkEqual(int32(i), rm.Message.Body()))
+               errorIf(t, rm.Accept())
        }
-       server.Close(nil)
-       client.Connection().Close(nil)
 }
 
 // Test timeout versions of waiting functions.
 func TestTimeouts(t *testing.T) {
-       var err error
-       rchan := make(chan Receiver, 1)
-       client, server := newClientServer(t)
-       go func() {
-               for i := range server.Incoming() {
-                       switch i := i.(type) {
-                       case *IncomingReceiver:
-                               i.SetCapacity(1)
-                               i.SetPrefetch(false)
-                               rchan <- i.Accept().(Receiver) // Issue credit 
only on receive
-                       default:
-                               i.Accept()
-                       }
-               }
-       }()
-       defer func() { closeClientServer(client, server) }()
-
-       // Open client sender
-       snd, err := client.Sender(Target("test"))
-       if err != nil {
-               t.Fatal(err)
-       }
-       rcv := <-rchan
+       p := newPipe(t, nil, nil)
+       defer func() { p.close() }()
+       snd, rcv := p.sender(Target("test"))
 
        // Test send with timeout
        short := time.Millisecond
@@ -264,11 +116,11 @@ func TestTimeouts(t *testing.T) {
                t.Error("want Timeout got", err)
        }
        // Test receive with timeout
-       if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, 
expect timeout.
+       if _, err := rcv.ReceiveTimeout(0); err != Timeout { // No credit, 
expect timeout.
                t.Error("want Timeout got", err)
        }
        // Test receive with timeout
-       if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, 
expect timeout.
+       if _, err := rcv.ReceiveTimeout(short); err != Timeout { // No credit, 
expect timeout.
                t.Error("want Timeout got", err)
        }
        // There is now a credit on the link due to receive
@@ -295,57 +147,6 @@ func TestTimeouts(t *testing.T) {
        }
 }
 
-// A server that returns the opposite end of each client link via channels.
-type pairs struct {
-       t        *testing.T
-       client   Session
-       server   Connection
-       rchan    chan Receiver
-       schan    chan Sender
-       capacity int
-       prefetch bool
-}
-
-func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
-       p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan 
Sender, 1)}
-       p.client, p.server = newClientServer(t)
-       go func() {
-               for i := range p.server.Incoming() {
-                       switch i := i.(type) {
-                       case *IncomingReceiver:
-                               i.SetCapacity(capacity)
-                               i.SetPrefetch(prefetch)
-                               p.rchan <- i.Accept().(Receiver)
-                       case *IncomingSender:
-                               p.schan <- i.Accept().(Sender)
-                       default:
-                               i.Accept()
-                       }
-               }
-       }()
-       return p
-}
-
-func (p *pairs) close() {
-       closeClientServer(p.client, p.server)
-}
-
-// Return a client sender and server receiver
-func (p *pairs) senderReceiver() (Sender, Receiver) {
-       snd, err := p.client.Sender()
-       fatalIf(p.t, err)
-       rcv := <-p.rchan
-       return snd, rcv
-}
-
-// Return a client receiver and server sender
-func (p *pairs) receiverSender() (Receiver, Sender) {
-       rcv, err := p.client.Receiver()
-       fatalIf(p.t, err)
-       snd := <-p.schan
-       return rcv, snd
-}
-
 type result struct {
        label string
        err   error
@@ -370,8 +171,9 @@ func doDisposition(ack <-chan Outcome, results chan result) 
{
 
 // Senders get credit immediately if receivers have prefetch set
 func TestSendReceivePrefetch(t *testing.T) {
-       pairs := newPairs(t, 1, true)
-       s, r := pairs.senderReceiver()
+       p := newPipe(t, nil, nil)
+       p.prefetch = true
+       s, r := p.sender()
        s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should 
not block for credit.
        if _, err := r.Receive(); err != nil {
                t.Error(err)
@@ -380,8 +182,9 @@ func TestSendReceivePrefetch(t *testing.T) {
 
 // Senders do not get credit till Receive() if receivers don't have prefetch
 func TestSendReceiveNoPrefetch(t *testing.T) {
-       pairs := newPairs(t, 1, false)
-       s, r := pairs.senderReceiver()
+       p := newPipe(t, nil, nil)
+       p.prefetch = false
+       s, r := p.sender()
        done := make(chan struct{}, 1)
        go func() {
                s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // 
Should block for credit.
@@ -402,14 +205,14 @@ func TestSendReceiveNoPrefetch(t *testing.T) {
 // Test that closing Links interrupts blocked link functions.
 func TestLinkCloseInterrupt(t *testing.T) {
        want := amqp.Error{Name: "x", Description: "all bad"}
-       pairs := newPairs(t, 1, false)
+       p := newPipe(t, nil, nil)
        results := make(chan result) // Collect expected errors
 
        // Note closing the link does not interrupt Send() calls, the AMQP spec 
says
        // that deliveries can be settled after the link is closed.
 
        // Receiver.Close() interrupts Receive()
-       snd, rcv := pairs.senderReceiver()
+       snd, rcv := p.sender()
        go doReceive(rcv, results)
        rcv.Close(want)
        if r := <-results; want != r.err {
@@ -417,7 +220,7 @@ func TestLinkCloseInterrupt(t *testing.T) {
        }
 
        // Remote Sender.Close() interrupts Receive()
-       snd, rcv = pairs.senderReceiver()
+       snd, rcv = p.sender()
        go doReceive(rcv, results)
        snd.Close(want)
        if r := <-results; want != r.err {
@@ -428,27 +231,28 @@ func TestLinkCloseInterrupt(t *testing.T) {
 // Test closing the server end of a connection.
 func TestConnectionCloseInterrupt1(t *testing.T) {
        want := amqp.Error{Name: "x", Description: "bad"}
-       pairs := newPairs(t, 1, true)
+       p := newSocketPair(t, nil, nil)
+       p.prefetch = true
        results := make(chan result) // Collect expected errors
 
        // Connection.Close() interrupts Send, Receive, Disposition.
-       snd, rcv := pairs.senderReceiver()
+       snd, rcv := p.sender()
        go doSend(snd, results)
 
        if _, err := rcv.Receive(); err != nil {
                t.Error("receive", err)
        }
-       rcv, snd = pairs.receiverSender()
+       rcv, snd = p.receiver()
        go doReceive(rcv, results)
 
-       snd, rcv = pairs.senderReceiver()
+       snd, rcv = p.sender()
        ack := snd.SendWaitable(amqp.NewMessage())
        if _, err := rcv.Receive(); err != nil {
                t.Error("receive", err)
        }
        go doDisposition(ack, results)
 
-       pairs.server.Close(want)
+       p.server.Close(want)
        for i := 0; i < 3; i++ {
                if r := <-results; want != r.err {
                        t.Errorf("want %v got %v", want, r)
@@ -459,24 +263,25 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
 // Test closing the client end of the connection.
 func TestConnectionCloseInterrupt2(t *testing.T) {
        want := amqp.Error{Name: "x", Description: "bad"}
-       pairs := newPairs(t, 1, true)
+       p := newSocketPair(t, nil, nil)
+       p.prefetch = true
        results := make(chan result) // Collect expected errors
 
        // Connection.Close() interrupts Send, Receive, Disposition.
-       snd, rcv := pairs.senderReceiver()
+       snd, rcv := p.sender()
        go doSend(snd, results)
        if _, err := rcv.Receive(); err != nil {
                t.Error("receive", err)
        }
 
-       rcv, snd = pairs.receiverSender()
+       rcv, snd = p.receiver()
        go doReceive(rcv, results)
 
-       snd, rcv = pairs.senderReceiver()
+       snd, rcv = p.sender()
        ack := snd.SendWaitable(amqp.NewMessage())
        go doDisposition(ack, results)
 
-       pairs.client.Connection().Close(want)
+       p.client.Connection().Close(want)
        for i := 0; i < 3; i++ {
                if r := <-results; want != r.err {
                        t.Errorf("want %v got %v", want, r.err)
@@ -484,63 +289,43 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
        }
 }
 
-func heartbeat(c Connection) time.Duration {
-       return c.(*connection).engine.Transport().RemoteIdleTimeout()
-}
-
 func TestHeartbeat(t *testing.T) {
-       client, server := newClientServerOpts(t,
-               []ConnectionOption{Heartbeat(102 * time.Millisecond)},
-               nil)
-       defer closeClientServer(client, server)
-
-       var serverHeartbeat time.Duration
-
-       go func() {
-               for in := range server.Incoming() {
-                       switch in := in.(type) {
-                       case *IncomingConnection:
-                               serverHeartbeat = in.Heartbeat()
-                               in.AcceptConnection(Heartbeat(101 * 
time.Millisecond))
-                       default:
-                               in.Accept()
-                       }
-               }
-       }()
+       p := newSocketPair(t,
+               []ConnectionOption{Heartbeat(12 * time.Millisecond)},
+               []ConnectionOption{Heartbeat(11 * time.Millisecond)})
+       defer func() { p.close() }()
 
-       // Freeze the server to stop it sending heartbeats.
+       // Function to freeze the server to stop it sending heartbeats.
        unfreeze := make(chan bool)
        defer close(unfreeze)
-       freeze := func() error { return 
server.(*connection).engine.Inject(func() { <-unfreeze }) }
+       freeze := func() error { return 
p.server.(*connection).engine.Inject(func() { <-unfreeze }) }
 
-       fatalIf(t, client.Sync())
-       errorIf(t, checkEqual(101*time.Millisecond, 
heartbeat(client.Connection())))
-       errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
-       errorIf(t, client.Connection().Error())
+       fatalIf(t, p.client.Sync())
+       errorIf(t, checkEqual(11*time.Millisecond, 
p.client.Connection().Heartbeat()))
+       errorIf(t, checkEqual(12*time.Millisecond, p.server.Heartbeat()))
 
        // Freeze the server for less than a heartbeat
        fatalIf(t, freeze())
-       time.Sleep(50 * time.Millisecond)
+       time.Sleep(5 * time.Millisecond)
        unfreeze <- true
        // Make sure server is still responding.
-       s, err := client.Sender()
-       errorIf(t, err)
+       s, _ := p.sender()
        errorIf(t, s.Sync())
 
-       // Freeze the server till the client times out the connection
+       // Freeze the server till the p.client times out the connection
        fatalIf(t, freeze())
        select {
-       case <-client.Done():
-               if amqp.ResourceLimitExceeded != 
client.Error().(amqp.Error).Name {
-                       t.Error("bad timeout error:", client.Error())
+       case <-p.client.Done():
+               if amqp.ResourceLimitExceeded != 
p.client.Error().(amqp.Error).Name {
+                       t.Error("bad timeout error:", p.client.Error())
                }
        case <-time.After(400 * time.Millisecond):
                t.Error("connection failed to time out")
        }
 
        unfreeze <- true // Unfreeze the server
-       <-server.Done()
-       if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
-               t.Error("bad timeout error:", server.Error())
+       <-p.server.Done()
+       if amqp.ResourceLimitExceeded != p.server.Error().(amqp.Error).Name {
+               t.Error("bad timeout error:", p.server.Error())
        }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to