PROTON-1910: [go] test refactor and benchmarks - Simplify commmon test tools, move to commmon_test.go - Use net.Pipe for most tests, more efficient than a full network socket - Added simple benchmarks
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9b4b989 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9b4b989 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9b4b989 Branch: refs/heads/master Commit: d9b4b9893412676e947d693a7d4e336fad4d4110 Parents: 486fbaf Author: Alan Conway <acon...@redhat.com> Authored: Fri Sep 21 00:51:45 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Thu Oct 11 12:32:59 2018 -0400 ---------------------------------------------------------------------- go/src/qpid.apache.org/amqp/message_test.go | 138 ++++++-- go/src/qpid.apache.org/electron/auth_test.go | 39 +-- .../qpid.apache.org/electron/benchmark_test.go | 132 ++++++++ go/src/qpid.apache.org/electron/common_test.go | 148 ++++++++ .../qpid.apache.org/electron/electron_test.go | 337 ++++--------------- 5 files changed, 459 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/amqp/message_test.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go index 663e82f..b22c60d 100644 --- a/go/src/qpid.apache.org/amqp/message_test.go +++ b/go/src/qpid.apache.org/amqp/message_test.go @@ -20,6 +20,7 @@ under the License. package amqp import ( + "reflect" "testing" "time" ) @@ -28,9 +29,7 @@ func roundTrip(m Message) error { var err error if buffer, err := m.Encode(nil); err == nil { if m2, err := DecodeMessage(buffer); err == nil { - if err = checkEqual(m, m2); err == nil { - err = checkEqual(m.String(), m2.String()) - } + err = checkEqual(m, m2) } } return err @@ -38,35 +37,40 @@ func roundTrip(m Message) error { func TestDefaultMessage(t *testing.T) { m := NewMessage() + mv := reflect.ValueOf(m) // Check defaults - for _, data := range [][]interface{}{ - {m.Inferred(), false}, - {m.Durable(), false}, - {m.Priority(), uint8(4)}, - {m.TTL(), time.Duration(0)}, - {m.UserId(), ""}, - {m.Address(), ""}, - {m.Subject(), ""}, - {m.ReplyTo(), ""}, - {m.ContentType(), ""}, - {m.ContentEncoding(), ""}, - {m.GroupId(), ""}, - {m.GroupSequence(), int32(0)}, - {m.ReplyToGroupId(), ""}, - {m.MessageId(), nil}, - {m.CorrelationId(), nil}, - {m.DeliveryAnnotations(), map[AnnotationKey]interface{}(nil)}, - {m.MessageAnnotations(), map[AnnotationKey]interface{}(nil)}, - {m.ApplicationProperties(), map[string]interface{}(nil)}, + for _, x := range []struct { + method string + want interface{} + }{ + {"Inferred", false}, + {"Durable", false}, + {"Priority", uint8(4)}, + {"TTL", time.Duration(0)}, + {"UserId", ""}, + {"Address", ""}, + {"Subject", ""}, + {"ReplyTo", ""}, + {"ContentType", ""}, + {"ContentEncoding", ""}, + {"GroupId", ""}, + {"GroupSequence", int32(0)}, + {"ReplyToGroupId", ""}, + {"MessageId", nil}, + {"CorrelationId", nil}, + {"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)}, + {"MessageAnnotations", map[AnnotationKey]interface{}(nil)}, + {"ApplicationProperties", map[string]interface{}(nil)}, // Deprecated - {m.Instructions(), map[string]interface{}(nil)}, - {m.Annotations(), map[string]interface{}(nil)}, - {m.Properties(), map[string]interface{}(nil)}, - {m.Body(), nil}, + {"Instructions", map[string]interface{}(nil)}, + {"Annotations", map[string]interface{}(nil)}, + {"Properties", map[string]interface{}(nil)}, + {"Body", nil}, } { - if err := checkEqual(data[0], data[1]); err != nil { - t.Error(err) + ret := mv.MethodByName(x.method).Call(nil) + if err := checkEqual(x.want, ret[0].Interface()); err != nil { + t.Errorf("%s: %s", x.method, err) } } if err := roundTrip(m); err != nil { @@ -84,14 +88,17 @@ func TestMessageString(t *testing.T) { m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"}) m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"}) m.SetApplicationProperties(map[string]interface{}{"int": int32(32)}) + if err := roundTrip(m); err != nil { + t.Error(err) + } msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}` if err := checkEqual(msgstr, m.String()); err != nil { t.Error(err) } } -func TestMessageRoundTrip(t *testing.T) { - m := NewMessage() +// Set all message properties +func setMessageProperties(m Message) Message { m.SetInferred(false) m.SetDurable(true) m.SetPriority(42) @@ -110,7 +117,25 @@ func TestMessageRoundTrip(t *testing.T) { m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"}) m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"}) m.SetApplicationProperties(map[string]interface{}{"int": int32(32), "bool": true}) - m.Marshal("hello") + return m +} + +func TestMessageRoundTrip(t *testing.T) { + m1 := NewMessage() + setMessageProperties(m1) + m1.Marshal("hello") + + buffer, err := m1.Encode(nil) + if err != nil { + t.Fatal(err) + } + m, err := DecodeMessage(buffer) + if err != nil { + t.Fatal(err) + } + if err = checkEqual(m1, m); err != nil { + t.Error(err) + } for _, data := range [][]interface{}{ {m.Inferred(), false}, @@ -215,3 +240,54 @@ func TestMessageBodyTypes(t *testing.T) { // TODO aconway 2015-09-08: array etc. } + +// Benchmarks assign to package-scope variables to prevent being optimized out. +var bmM Message +var bmBuf []byte + +func BenchmarkNewMessageEmpty(b *testing.B) { + for n := 0; n < b.N; n++ { + bmM = NewMessage() + } +} + +func BenchmarkNewMessageString(b *testing.B) { + for n := 0; n < b.N; n++ { + bmM = NewMessageWith("hello") + } +} + +func BenchmarkNewMessageAll(b *testing.B) { + for n := 0; n < b.N; n++ { + bmM = setMessageProperties(NewMessageWith("hello")) + } +} + +func BenchmarkEncode(b *testing.B) { + m := setMessageProperties(NewMessageWith("hello")) + var buf []byte + b.ResetTimer() + for n := 0; n < b.N; n++ { + buf, err := m.Encode(buf) + if err != nil { + b.Fatal(err) + } + bmBuf = buf + } +} + +func BenchmarkDecode(b *testing.B) { + var buf []byte + buf, err := setMessageProperties(NewMessageWith("hello")).Encode(buf) + if err != nil { + b.Fatal(err) + } + m := NewMessage() + b.ResetTimer() + for n := 0; n < b.N; n++ { + if err := m.Decode(buf); err != nil { + b.Fatal(err) + } + bmM = m + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/auth_test.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/auth_test.go b/go/src/qpid.apache.org/electron/auth_test.go index 162b366..30ef401 100644 --- a/go/src/qpid.apache.org/electron/auth_test.go +++ b/go/src/qpid.apache.org/electron/auth_test.go @@ -29,57 +29,40 @@ import ( "testing" ) -func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) { - client, server := newClientServerOpts(t, copts, sopts) - defer closeClientServer(client, server) - - go func() { - for in := range server.Incoming() { - switch in := in.(type) { - case *IncomingConnection: - got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()} - } - in.Accept() - } - }() - - err = client.Sync() - return -} - func TestAuthAnonymous(t *testing.T) { - got, err := testAuthClientServer(t, + p := newPipe(t, []ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)}, []ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)}) - fatalIf(t, err) - errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got)) + fatalIf(t, p.server.Sync()) + errorIf(t, checkEqual("anonymous", p.server.User())) + errorIf(t, checkEqual("vhost", p.server.VirtualHost())) } func TestAuthPlain(t *testing.T) { extendedSASL.startTest(t) - got, err := testAuthClientServer(t, + p := newPipe(t, []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))}, []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")}) - fatalIf(t, err) - errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got)) + fatalIf(t, p.server.Sync()) + errorIf(t, checkEqual("fred@proton", p.server.User())) } func TestAuthBadPass(t *testing.T) { extendedSASL.startTest(t) - _, err := testAuthClientServer(t, + p := newPipe(t, []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))}, []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")}) - if err == nil { + if p.server.Sync() == nil { t.Error("Expected auth failure for bad pass") } } func TestAuthBadUser(t *testing.T) { extendedSASL.startTest(t) - _, err := testAuthClientServer(t, + p := newPipe(t, []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))}, []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")}) - if err == nil { + if p.server.Sync() == nil { t.Error("Expected auth failure for bad user") } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/benchmark_test.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/benchmark_test.go b/go/src/qpid.apache.org/electron/benchmark_test.go new file mode 100644 index 0000000..ae9d47c --- /dev/null +++ b/go/src/qpid.apache.org/electron/benchmark_test.go @@ -0,0 +1,132 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "flag" + "strings" + "sync" + "testing" + + "qpid.apache.org/amqp" +) + +// To change capacity use +// go test -bench=. -args -capacity 100 +var capacity = flag.Int("capacity", 1000, "Prefetch capacity") +var bodySize = flag.Int("bodySize", 1000, "Message body size") + +type bmCommon struct { + b *testing.B + p *pair + s Sender + r Receiver + ack chan Outcome + done sync.WaitGroup +} + +func makeBmCommon(p *pair, waitCount int) bmCommon { + bm := bmCommon{p: p, b: p.t.(*testing.B)} + bm.p.capacity = *capacity + bm.p.prefetch = true + bm.s, bm.r = p.sender() + bm.ack = make(chan Outcome, *capacity) + bm.done.Add(waitCount) + bm.b.ResetTimer() + return bm +} + +func (bm *bmCommon) receiveAccept() { + defer bm.done.Done() + for n := 0; n < bm.b.N; n++ { + if rm, err := bm.r.Receive(); err != nil { + bm.b.Fatal(err) + } else { + fatalIf(bm.b, rm.Accept()) + } + } +} + +func (bm *bmCommon) outcomes() { + defer bm.done.Done() + for n := 0; n < bm.b.N; n++ { + fatalIf(bm.b, (<-bm.ack).Error) + } +} + +var emptyMsg = amqp.NewMessage() + +func BenchmarkSendForget(b *testing.B) { + bm := makeBmCommon(newPipe(b, nil, nil), 1) + defer bm.p.close() + + go func() { // Receive, no ack + defer bm.done.Done() + for n := 0; n < b.N; n++ { + if _, err := bm.r.Receive(); err != nil { + b.Fatal(err) + } + } + }() + + for n := 0; n < b.N; n++ { + bm.s.SendForget(emptyMsg) + } + bm.done.Wait() +} + +func BenchmarkSendSync(b *testing.B) { + bm := makeBmCommon(newPipe(b, nil, nil), 1) + defer bm.p.close() + + go bm.receiveAccept() + for n := 0; n < b.N; n++ { + fatalIf(b, bm.s.SendSync(emptyMsg).Error) + } + bm.done.Wait() +} + +func BenchmarkSendAsync(b *testing.B) { + bm := makeBmCommon(newPipe(b, nil, nil), 2) + defer bm.p.close() + + go bm.outcomes() // Handle outcomes + go bm.receiveAccept() // Receive + for n := 0; n < b.N; n++ { + bm.s.SendAsync(emptyMsg, bm.ack, nil) + } + bm.done.Wait() +} + +// Create a new message for each send, with body and property. +func BenchmarkSendAsyncNewMessage(b *testing.B) { + body := strings.Repeat("x", *bodySize) + bm := makeBmCommon(newPipe(b, nil, nil), 2) + defer bm.p.close() + + go bm.outcomes() // Handle outcomes + go bm.receiveAccept() // Receive + for n := 0; n < b.N; n++ { + msg := amqp.NewMessageWith(body) + msg.SetApplicationProperties(map[string]interface{}{"prop": "value"}) + bm.s.SendAsync(msg, bm.ack, nil) + } + bm.done.Wait() +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/common_test.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go new file mode 100644 index 0000000..3aad825 --- /dev/null +++ b/go/src/qpid.apache.org/electron/common_test.go @@ -0,0 +1,148 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "fmt" + "net" + "path" + "reflect" + "runtime" + "testing" +) + +func decorate(err error, callDepth int) string { + _, file, line, _ := runtime.Caller(callDepth + 1) // annotate with location of caller. + _, file = path.Split(file) + return fmt.Sprintf("\n%s:%d: %v", file, line, err) +} + +func fatalIfN(t testing.TB, err error, callDepth int) { + if err != nil { + t.Fatal(decorate(err, callDepth+1)) + } +} + +func fatalIf(t testing.TB, err error) { + fatalIfN(t, err, 1) +} + +func errorIf(t testing.TB, err error) { + if err != nil { + t.Errorf(decorate(err, 1)) + } +} + +func checkEqual(want interface{}, got interface{}) error { + if !reflect.DeepEqual(want, got) { + return fmt.Errorf("(%#v != %#v)", want, got) + } + return nil +} + +// AMQP client/server pair +type pair struct { + t testing.TB + client Session + server Connection + capacity int + prefetch bool + rchan chan Receiver + schan chan Sender + auth connectionSettings +} + +func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []ConnectionOption) *pair { + opts := append([]ConnectionOption{Server()}, serverOpts...) + sc, _ := NewConnection(srv, opts...) + opts = append([]ConnectionOption{}, clientOpts...) + cc, _ := NewConnection(cli, opts...) + cs, _ := cc.Session() + p := &pair{ + t: t, + client: cs, + server: sc, + capacity: 100, + rchan: make(chan Receiver), + schan: make(chan Sender)} + + go func() { + for i := range p.server.Incoming() { + switch i := i.(type) { + case *IncomingReceiver: + if p.capacity > 0 { + i.SetCapacity(p.capacity) + } + i.SetPrefetch(p.prefetch) + p.rchan <- i.Accept().(Receiver) + break + case *IncomingSender: + p.schan <- i.Accept().(Sender) + default: + i.Accept() + } + } + }() + + return p +} + +// AMQP pair linked by in-memory pipe +func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair { + cli, srv := net.Pipe() + return newPair(t, cli, srv, clientOpts, serverOpts) +} + +// AMQP pair linked by TCP socket +func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair { + l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled + fatalIfN(t, err, 1) + srvCh := make(chan net.Conn) + var srvErr error + go func() { + var c net.Conn + c, srvErr = l.Accept() + srvCh <- c + }() + addr := l.Addr() + cli, err := net.Dial(addr.Network(), addr.String()) + fatalIfN(t, err, 1) + srv := <-srvCh + fatalIfN(t, srvErr, 1) + return newPair(t, cli, srv, clientOpts, serverOpts) +} + +func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) } + +// Return a client sender and server receiver +func (p *pair) sender(opts ...LinkOption) (Sender, Receiver) { + snd, err := p.client.Sender(opts...) + fatalIfN(p.t, err, 2) + rcv := <-p.rchan + return snd, rcv +} + +// Return a client receiver and server sender +func (p *pair) receiver(opts ...LinkOption) (Receiver, Sender) { + rcv, err := p.client.Receiver(opts...) + fatalIfN(p.t, err, 2) + snd := <-p.schan + return rcv, snd +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/electron_test.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go index 74759f5..c8a51c7 100644 --- a/go/src/qpid.apache.org/electron/electron_test.go +++ b/go/src/qpid.apache.org/electron/electron_test.go @@ -21,119 +21,23 @@ package electron import ( "fmt" - "net" - "path" "qpid.apache.org/amqp" - "reflect" - "runtime" "testing" "time" ) -func fatalIf(t *testing.T, err error) { - if err != nil { - _, file, line, ok := runtime.Caller(1) // annotate with location of caller. - if ok { - _, file = path.Split(file) - } - t.Fatalf("(from %s:%d) %v", file, line, err) - } -} - -func errorIf(t *testing.T, err error) { - if err != nil { - _, file, line, ok := runtime.Caller(1) // annotate with location of caller. - if ok { - _, file = path.Split(file) - } - t.Errorf("(from %s:%d) %v", file, line, err) - } -} - -func checkEqual(want interface{}, got interface{}) error { - if !reflect.DeepEqual(want, got) { - return fmt.Errorf("%#v != %#v", want, got) - } - return nil -} - -// Start a server, return listening addr and channel for incoming Connections. -func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) { - listener, err := net.Listen("tcp4", "") // For systems with ipv6 disabled - fatalIf(t, err) - addr := listener.Addr() - ch := make(chan Connection) - go func() { - conn, err := listener.Accept() - c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...) - fatalIf(t, err) - ch <- c - }() - return addr, ch -} - -// Open a client connection and session, return the session. -func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session { - conn, err := net.Dial(addr.Network(), addr.String()) - fatalIf(t, err) - // Don't bother checking error here, it's an async error so it's racy to do so anyway. - // Let caller use Sync() or catch it on first use. - c, _ := cont.Connection(conn, opts...) - sn, _ := c.Session() - return sn -} - -// Return client and server ends of the same connection. -func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) { - addr, ch := newServer(t, NewContainer("test-server"), sopts...) - client = newClient(t, NewContainer("test-client"), addr, copts...) - return client, <-ch -} - -// Return client and server ends of the same connection. -func newClientServer(t *testing.T) (client Session, server Connection) { - return newClientServerOpts(t, nil, nil) -} - -// Close client and server -func closeClientServer(client Session, server Connection) { - client.Connection().Close(nil) - server.Close(nil) -} - // Send a message one way with a client sender and server receiver, verify ack. -func TestClientSendServerReceive(t *testing.T) { +func TestClientSender(t *testing.T) { + p := newPipe(t, nil, nil) + defer func() { p.close() }() + nLinks := 3 nMessages := 3 - rchan := make(chan Receiver, nLinks) - client, server := newClientServer(t) - go func() { - for in := range server.Incoming() { - switch in := in.(type) { - case *IncomingReceiver: - in.SetCapacity(1) - in.SetPrefetch(false) - rchan <- in.Accept().(Receiver) - default: - in.Accept() - } - } - }() - - defer func() { closeClientServer(client, server) }() - s := make([]Sender, nLinks) - for i := 0; i < nLinks; i++ { - var err error - s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i))) - if err != nil { - t.Fatal(err) - } - } r := make([]Receiver, nLinks) for i := 0; i < nLinks; i++ { - r[i] = <-rchan + s[i], r[i] = p.sender(Target(fmt.Sprintf("foo%d", i))) } for i := 0; i < nLinks; i++ { @@ -146,16 +50,12 @@ func TestClientSendServerReceive(t *testing.T) { m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j)) var err error s[i].SendAsync(m, ack, "testing") - if err != nil { - t.Fatal(err) - } + fatalIf(t, err) }() // Server receive rm, err := r[i].Receive() - if err != nil { - t.Fatal(err) - } + fatalIf(t, err) if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got { t.Errorf("%#v != %#v", want, got) } @@ -182,76 +82,28 @@ func TestClientSendServerReceive(t *testing.T) { func TestClientReceiver(t *testing.T) { nMessages := 3 - client, server := newClientServer(t) + p := newPipe(t, nil, nil) + defer func() { p.close() }() + r, s := p.receiver(Source("foo"), Capacity(nMessages), Prefetch(true)) go func() { - for in := range server.Incoming() { - switch in := in.(type) { - case *IncomingSender: - s := in.Accept().(Sender) - go func() { - for i := int32(0); i < int32(nMessages); i++ { - out := s.SendSync(amqp.NewMessageWith(i)) - if out.Error != nil { - t.Error(out.Error) - return - } - } - s.Close(nil) - }() - default: - in.Accept() - } + for i := 0; i < nMessages; i++ { // Server sends + out := s.SendSync(amqp.NewMessageWith(int32(i))) + fatalIf(t, out.Error) } }() - - r, err := client.Receiver(Source("foo")) - if err != nil { - t.Fatal(err) - } - for i := int32(0); i < int32(nMessages); i++ { + for i := 0; i < nMessages; i++ { // Client receives rm, err := r.Receive() - if err != nil { - if err != Closed { - t.Error(err) - } - break - } - if err := rm.Accept(); err != nil { - t.Error(err) - } - if b, ok := rm.Message.Body().(int32); !ok || b != i { - t.Errorf("want %v, true got %v, %v", i, b, ok) - } + fatalIf(t, err) + errorIf(t, checkEqual(int32(i), rm.Message.Body())) + errorIf(t, rm.Accept()) } - server.Close(nil) - client.Connection().Close(nil) } // Test timeout versions of waiting functions. func TestTimeouts(t *testing.T) { - var err error - rchan := make(chan Receiver, 1) - client, server := newClientServer(t) - go func() { - for i := range server.Incoming() { - switch i := i.(type) { - case *IncomingReceiver: - i.SetCapacity(1) - i.SetPrefetch(false) - rchan <- i.Accept().(Receiver) // Issue credit only on receive - default: - i.Accept() - } - } - }() - defer func() { closeClientServer(client, server) }() - - // Open client sender - snd, err := client.Sender(Target("test")) - if err != nil { - t.Fatal(err) - } - rcv := <-rchan + p := newPipe(t, nil, nil) + defer func() { p.close() }() + snd, rcv := p.sender(Target("test")) // Test send with timeout short := time.Millisecond @@ -264,11 +116,11 @@ func TestTimeouts(t *testing.T) { t.Error("want Timeout got", err) } // Test receive with timeout - if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout. + if _, err := rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout. t.Error("want Timeout got", err) } // Test receive with timeout - if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout. + if _, err := rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout. t.Error("want Timeout got", err) } // There is now a credit on the link due to receive @@ -295,57 +147,6 @@ func TestTimeouts(t *testing.T) { } } -// A server that returns the opposite end of each client link via channels. -type pairs struct { - t *testing.T - client Session - server Connection - rchan chan Receiver - schan chan Sender - capacity int - prefetch bool -} - -func newPairs(t *testing.T, capacity int, prefetch bool) *pairs { - p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)} - p.client, p.server = newClientServer(t) - go func() { - for i := range p.server.Incoming() { - switch i := i.(type) { - case *IncomingReceiver: - i.SetCapacity(capacity) - i.SetPrefetch(prefetch) - p.rchan <- i.Accept().(Receiver) - case *IncomingSender: - p.schan <- i.Accept().(Sender) - default: - i.Accept() - } - } - }() - return p -} - -func (p *pairs) close() { - closeClientServer(p.client, p.server) -} - -// Return a client sender and server receiver -func (p *pairs) senderReceiver() (Sender, Receiver) { - snd, err := p.client.Sender() - fatalIf(p.t, err) - rcv := <-p.rchan - return snd, rcv -} - -// Return a client receiver and server sender -func (p *pairs) receiverSender() (Receiver, Sender) { - rcv, err := p.client.Receiver() - fatalIf(p.t, err) - snd := <-p.schan - return rcv, snd -} - type result struct { label string err error @@ -370,8 +171,9 @@ func doDisposition(ack <-chan Outcome, results chan result) { // Senders get credit immediately if receivers have prefetch set func TestSendReceivePrefetch(t *testing.T) { - pairs := newPairs(t, 1, true) - s, r := pairs.senderReceiver() + p := newPipe(t, nil, nil) + p.prefetch = true + s, r := p.sender() s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit. if _, err := r.Receive(); err != nil { t.Error(err) @@ -380,8 +182,9 @@ func TestSendReceivePrefetch(t *testing.T) { // Senders do not get credit till Receive() if receivers don't have prefetch func TestSendReceiveNoPrefetch(t *testing.T) { - pairs := newPairs(t, 1, false) - s, r := pairs.senderReceiver() + p := newPipe(t, nil, nil) + p.prefetch = false + s, r := p.sender() done := make(chan struct{}, 1) go func() { s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit. @@ -402,14 +205,14 @@ func TestSendReceiveNoPrefetch(t *testing.T) { // Test that closing Links interrupts blocked link functions. func TestLinkCloseInterrupt(t *testing.T) { want := amqp.Error{Name: "x", Description: "all bad"} - pairs := newPairs(t, 1, false) + p := newPipe(t, nil, nil) results := make(chan result) // Collect expected errors // Note closing the link does not interrupt Send() calls, the AMQP spec says // that deliveries can be settled after the link is closed. // Receiver.Close() interrupts Receive() - snd, rcv := pairs.senderReceiver() + snd, rcv := p.sender() go doReceive(rcv, results) rcv.Close(want) if r := <-results; want != r.err { @@ -417,7 +220,7 @@ func TestLinkCloseInterrupt(t *testing.T) { } // Remote Sender.Close() interrupts Receive() - snd, rcv = pairs.senderReceiver() + snd, rcv = p.sender() go doReceive(rcv, results) snd.Close(want) if r := <-results; want != r.err { @@ -428,27 +231,28 @@ func TestLinkCloseInterrupt(t *testing.T) { // Test closing the server end of a connection. func TestConnectionCloseInterrupt1(t *testing.T) { want := amqp.Error{Name: "x", Description: "bad"} - pairs := newPairs(t, 1, true) + p := newSocketPair(t, nil, nil) + p.prefetch = true results := make(chan result) // Collect expected errors // Connection.Close() interrupts Send, Receive, Disposition. - snd, rcv := pairs.senderReceiver() + snd, rcv := p.sender() go doSend(snd, results) if _, err := rcv.Receive(); err != nil { t.Error("receive", err) } - rcv, snd = pairs.receiverSender() + rcv, snd = p.receiver() go doReceive(rcv, results) - snd, rcv = pairs.senderReceiver() + snd, rcv = p.sender() ack := snd.SendWaitable(amqp.NewMessage()) if _, err := rcv.Receive(); err != nil { t.Error("receive", err) } go doDisposition(ack, results) - pairs.server.Close(want) + p.server.Close(want) for i := 0; i < 3; i++ { if r := <-results; want != r.err { t.Errorf("want %v got %v", want, r) @@ -459,24 +263,25 @@ func TestConnectionCloseInterrupt1(t *testing.T) { // Test closing the client end of the connection. func TestConnectionCloseInterrupt2(t *testing.T) { want := amqp.Error{Name: "x", Description: "bad"} - pairs := newPairs(t, 1, true) + p := newSocketPair(t, nil, nil) + p.prefetch = true results := make(chan result) // Collect expected errors // Connection.Close() interrupts Send, Receive, Disposition. - snd, rcv := pairs.senderReceiver() + snd, rcv := p.sender() go doSend(snd, results) if _, err := rcv.Receive(); err != nil { t.Error("receive", err) } - rcv, snd = pairs.receiverSender() + rcv, snd = p.receiver() go doReceive(rcv, results) - snd, rcv = pairs.senderReceiver() + snd, rcv = p.sender() ack := snd.SendWaitable(amqp.NewMessage()) go doDisposition(ack, results) - pairs.client.Connection().Close(want) + p.client.Connection().Close(want) for i := 0; i < 3; i++ { if r := <-results; want != r.err { t.Errorf("want %v got %v", want, r.err) @@ -484,63 +289,43 @@ func TestConnectionCloseInterrupt2(t *testing.T) { } } -func heartbeat(c Connection) time.Duration { - return c.(*connection).engine.Transport().RemoteIdleTimeout() -} - func TestHeartbeat(t *testing.T) { - client, server := newClientServerOpts(t, - []ConnectionOption{Heartbeat(102 * time.Millisecond)}, - nil) - defer closeClientServer(client, server) - - var serverHeartbeat time.Duration - - go func() { - for in := range server.Incoming() { - switch in := in.(type) { - case *IncomingConnection: - serverHeartbeat = in.Heartbeat() - in.AcceptConnection(Heartbeat(101 * time.Millisecond)) - default: - in.Accept() - } - } - }() + p := newSocketPair(t, + []ConnectionOption{Heartbeat(12 * time.Millisecond)}, + []ConnectionOption{Heartbeat(11 * time.Millisecond)}) + defer func() { p.close() }() - // Freeze the server to stop it sending heartbeats. + // Function to freeze the server to stop it sending heartbeats. unfreeze := make(chan bool) defer close(unfreeze) - freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) } + freeze := func() error { return p.server.(*connection).engine.Inject(func() { <-unfreeze }) } - fatalIf(t, client.Sync()) - errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection()))) - errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat)) - errorIf(t, client.Connection().Error()) + fatalIf(t, p.client.Sync()) + errorIf(t, checkEqual(11*time.Millisecond, p.client.Connection().Heartbeat())) + errorIf(t, checkEqual(12*time.Millisecond, p.server.Heartbeat())) // Freeze the server for less than a heartbeat fatalIf(t, freeze()) - time.Sleep(50 * time.Millisecond) + time.Sleep(5 * time.Millisecond) unfreeze <- true // Make sure server is still responding. - s, err := client.Sender() - errorIf(t, err) + s, _ := p.sender() errorIf(t, s.Sync()) - // Freeze the server till the client times out the connection + // Freeze the server till the p.client times out the connection fatalIf(t, freeze()) select { - case <-client.Done(): - if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name { - t.Error("bad timeout error:", client.Error()) + case <-p.client.Done(): + if amqp.ResourceLimitExceeded != p.client.Error().(amqp.Error).Name { + t.Error("bad timeout error:", p.client.Error()) } case <-time.After(400 * time.Millisecond): t.Error("connection failed to time out") } unfreeze <- true // Unfreeze the server - <-server.Done() - if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name { - t.Error("bad timeout error:", server.Error()) + <-p.server.Done() + if amqp.ResourceLimitExceeded != p.server.Error().(amqp.Error).Name { + t.Error("bad timeout error:", p.server.Error()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org