PROTON-1910: [go] proton.Link allow sending/receiving message as bytes
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ef716fa0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ef716fa0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ef716fa0 Branch: refs/heads/master Commit: ef716fa008e709efbab6ed64edfa53f0361c3262 Parents: 2a84494 Author: Alan Conway <acon...@redhat.com> Authored: Thu Oct 11 15:19:00 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Thu Oct 11 15:19:00 2018 -0400 ---------------------------------------------------------------------- go/src/qpid.apache.org/proton/message.go | 42 ++++++++++++++++++++------- 1 file changed, 32 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef716fa0/go/src/qpid.apache.org/proton/message.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/proton/message.go b/go/src/qpid.apache.org/proton/message.go index fbb1d48..7c166a5 100644 --- a/go/src/qpid.apache.org/proton/message.go +++ b/go/src/qpid.apache.org/proton/message.go @@ -26,9 +26,10 @@ import "C" import ( "fmt" - "qpid.apache.org/amqp" "strconv" "sync/atomic" + + "qpid.apache.org/amqp" ) // HasMessage is true if all message data is available. @@ -41,7 +42,23 @@ func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() && !d.Pa // handling an MMessage event is always a safe context to call this function. // // Will return an error if message is incomplete or not current. -func (delivery Delivery) Message() (m amqp.Message, err error) { +func (delivery Delivery) Message() (amqp.Message, error) { + var err error + if bytes, err := delivery.MessageBytes(); err == nil { + m := amqp.NewMessage() + err = m.Decode(bytes) + return m, err + } + return nil, err +} + +// MessageBytes extracts the raw message bytes contained in a delivery. +// +// Must be called in the correct link context with this delivery as the current message, +// handling an MMessage event is always a safe context to call this function. +// +// Will return an error if message is incomplete or not current. +func (delivery Delivery) MessageBytes() ([]byte, error) { if !delivery.Readable() { return nil, fmt.Errorf("delivery is not readable") } @@ -53,9 +70,7 @@ func (delivery Delivery) Message() (m amqp.Message, err error) { if result != len(data) { return nil, fmt.Errorf("cannot receive message: %s", PnErrorCode(result)) } - m = amqp.NewMessage() - err = m.Decode(data) - return + return data, nil } // Process-wide atomic counter for generating tag names @@ -68,15 +83,22 @@ func nextTag() string { // Send sends a amqp.Message over a Link. // Returns a Delivery that can be use to determine the outcome of the message. func (link Link) Send(m amqp.Message) (Delivery, error) { + var err error + if bytes, err := m.Encode(nil); err == nil { + if d, err := link.SendMessageBytes(bytes); err == nil { + return d, err + } + } + return Delivery{}, err +} + +// SendMessageBytes sends encoded bytes of an amqp.Message over a Link. +// Returns a Delivery that can be use to determine the outcome of the message. +func (link Link) SendMessageBytes(bytes []byte) (Delivery, error) { if !link.IsSender() { return Delivery{}, fmt.Errorf("attempt to send message on receiving link") } - delivery := link.Delivery(nextTag()) - bytes, err := m.Encode(nil) - if err != nil { - return Delivery{}, fmt.Errorf("cannot send message %s", err) - } result := link.SendBytes(bytes) link.Advance() if result != len(bytes) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org