PROTON-1910: [go] proton.Link allow sending/receiving message as bytes

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ef716fa0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ef716fa0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ef716fa0

Branch: refs/heads/master
Commit: ef716fa008e709efbab6ed64edfa53f0361c3262
Parents: 2a84494
Author: Alan Conway <acon...@redhat.com>
Authored: Thu Oct 11 15:19:00 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Thu Oct 11 15:19:00 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/proton/message.go | 42 ++++++++++++++++++++-------
 1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef716fa0/go/src/qpid.apache.org/proton/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/message.go 
b/go/src/qpid.apache.org/proton/message.go
index fbb1d48..7c166a5 100644
--- a/go/src/qpid.apache.org/proton/message.go
+++ b/go/src/qpid.apache.org/proton/message.go
@@ -26,9 +26,10 @@ import "C"
 
 import (
        "fmt"
-       "qpid.apache.org/amqp"
        "strconv"
        "sync/atomic"
+
+       "qpid.apache.org/amqp"
 )
 
 // HasMessage is true if all message data is available.
@@ -41,7 +42,23 @@ func (d Delivery) HasMessage() bool { return !d.IsNil() && 
d.Readable() && !d.Pa
 // handling an MMessage event is always a safe context to call this function.
 //
 // Will return an error if message is incomplete or not current.
-func (delivery Delivery) Message() (m amqp.Message, err error) {
+func (delivery Delivery) Message() (amqp.Message, error) {
+       var err error
+       if bytes, err := delivery.MessageBytes(); err == nil {
+               m := amqp.NewMessage()
+               err = m.Decode(bytes)
+               return m, err
+       }
+       return nil, err
+}
+
+// MessageBytes extracts the raw message bytes contained in a delivery.
+//
+// Must be called in the correct link context with this delivery as the 
current message,
+// handling an MMessage event is always a safe context to call this function.
+//
+// Will return an error if message is incomplete or not current.
+func (delivery Delivery) MessageBytes() ([]byte, error) {
        if !delivery.Readable() {
                return nil, fmt.Errorf("delivery is not readable")
        }
@@ -53,9 +70,7 @@ func (delivery Delivery) Message() (m amqp.Message, err 
error) {
        if result != len(data) {
                return nil, fmt.Errorf("cannot receive message: %s", 
PnErrorCode(result))
        }
-       m = amqp.NewMessage()
-       err = m.Decode(data)
-       return
+       return data, nil
 }
 
 // Process-wide atomic counter for generating tag names
@@ -68,15 +83,22 @@ func nextTag() string {
 // Send sends a amqp.Message over a Link.
 // Returns a Delivery that can be use to determine the outcome of the message.
 func (link Link) Send(m amqp.Message) (Delivery, error) {
+       var err error
+       if bytes, err := m.Encode(nil); err == nil {
+               if d, err := link.SendMessageBytes(bytes); err == nil {
+                       return d, err
+               }
+       }
+       return Delivery{}, err
+}
+
+// SendMessageBytes sends encoded bytes of an amqp.Message over a Link.
+// Returns a Delivery that can be use to determine the outcome of the message.
+func (link Link) SendMessageBytes(bytes []byte) (Delivery, error) {
        if !link.IsSender() {
                return Delivery{}, fmt.Errorf("attempt to send message on 
receiving link")
        }
-
        delivery := link.Delivery(nextTag())
-       bytes, err := m.Encode(nil)
-       if err != nil {
-               return Delivery{}, fmt.Errorf("cannot send message %s", err)
-       }
        result := link.SendBytes(bytes)
        link.Advance()
        if result != len(bytes) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to