PROTON-1910: [go] native Message implementation

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/886d2b93
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/886d2b93
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/886d2b93

Branch: refs/heads/master
Commit: 886d2b9349f74c02219b29990ee04278124f5224
Parents: ef716fa
Author: Alan Conway <acon...@redhat.com>
Authored: Thu Oct 11 15:20:23 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Thu Oct 11 15:20:23 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/message.go      | 521 +++++++++++++++--------
 go/src/qpid.apache.org/amqp/message_test.go |  10 +-
 2 files changed, 355 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message.go 
b/go/src/qpid.apache.org/amqp/message.go
index e514b26..919904c 100644
--- a/go/src/qpid.apache.org/amqp/message.go
+++ b/go/src/qpid.apache.org/amqp/message.go
@@ -35,10 +35,10 @@ package amqp
 import "C"
 
 import (
+       "bytes"
        "fmt"
-       "runtime"
+       "reflect"
        "time"
-       "unsafe"
 )
 
 // Message is the interface to an AMQP message.
@@ -124,11 +124,12 @@ type Message interface {
 
        // Per-delivery annotations to provide delivery instructions.
        // May be added or removed by intermediaries during delivery.
+       // See ApplicationProperties() for properties set by the application.
        DeliveryAnnotations() map[AnnotationKey]interface{}
        SetDeliveryAnnotations(map[AnnotationKey]interface{})
 
        // Message annotations added as part of the bare message at creation, 
usually
-       // by an AMQP library. See ApplicationProperties() for adding 
application data.
+       // by an AMQP library. See ApplicationProperties() for properties set 
by the application.
        MessageAnnotations() map[AnnotationKey]interface{}
        SetMessageAnnotations(map[AnnotationKey]interface{})
 
@@ -141,15 +142,18 @@ type Message interface {
        Inferred() bool
        SetInferred(bool)
 
-       // Marshal a Go value into the message body. See amqp.Marshal() for 
details.
+       // Get the message body, using the amqp.Unmarshal() rules for 
interface{}
+       Body() interface{}
+
+       // Set the body using amqp.Marshal()
+       SetBody(interface{})
+
+       // Marshal a Go value into the message body, synonym for SetBody()
        Marshal(interface{})
 
-       // Unmarshal the message body into the value pointed to by v. See 
amqp.Unmarshal() for details.
+       // Unmarshal the message body into the value pointed to by v. See 
amqp.Unmarshal()
        Unmarshal(interface{})
 
-       // Body value resulting from the default unmarshaling of message body 
as interface{}
-       Body() interface{}
-
        // Encode encodes the message as AMQP data. If buffer is non-nil and is 
large enough
        // the message is encoded into it, otherwise a new buffer is created.
        // Returns the buffer containing the message.
@@ -158,7 +162,7 @@ type Message interface {
        // Decode data into this message. Overwrites an existing message 
content.
        Decode(buffer []byte) error
 
-       // Clear the message contents.
+       // Clear the message contents, set all fields to the default value.
        Clear()
 
        // Copy the contents of another message to this one.
@@ -180,194 +184,205 @@ type Message interface {
        String() string
 }
 
-type message struct{ pn *C.pn_message_t }
-
-func freeMessage(m *message) {
-       C.pn_message_free(m.pn)
-       m.pn = nil
-}
-
 // NewMessage creates a new message instance.
 func NewMessage() Message {
-       m := &message{C.pn_message()}
-       runtime.SetFinalizer(m, freeMessage)
+       m := &message{}
+       m.Clear()
        return m
 }
 
-// NewMessageWith creates a message with value as the body. Equivalent to
-//     m := NewMessage(); m.Marshal(body)
+// NewMessageWith creates a message with value as the body.
 func NewMessageWith(value interface{}) Message {
        m := NewMessage()
-       m.Marshal(value)
+       m.SetBody(value)
        return m
 }
 
-func (m *message) Clear() { C.pn_message_clear(m.pn) }
-
-func (m *message) Copy(x Message) error {
-       if data, err := x.Encode(nil); err == nil {
-               return m.Decode(data)
-       } else {
-               return err
-       }
-}
-
-// ==== message get functions
-
-func rewindGet(data *C.pn_data_t) (v interface{}) {
-       C.pn_data_rewind(data)
-       C.pn_data_next(data)
-       unmarshal(&v, data)
-       return v
-}
-
-func (m *message) Inferred() bool  { return 
bool(C.pn_message_is_inferred(m.pn)) }
-func (m *message) Durable() bool   { return 
bool(C.pn_message_is_durable(m.pn)) }
-func (m *message) Priority() uint8 { return 
uint8(C.pn_message_get_priority(m.pn)) }
-func (m *message) TTL() time.Duration {
-       return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+// NewMessageCopy creates a copy of an existing message.
+func NewMessageCopy(m Message) Message {
+       m2 := NewMessage()
+       m2.Copy(m)
+       return m2
 }
-func (m *message) FirstAcquirer() bool        { return 
bool(C.pn_message_is_first_acquirer(m.pn)) }
-func (m *message) DeliveryCount() uint32      { return 
uint32(C.pn_message_get_delivery_count(m.pn)) }
-func (m *message) MessageId() interface{}     { return 
rewindGet(C.pn_message_id(m.pn)) }
-func (m *message) UserId() string             { return 
goString(C.pn_message_get_user_id(m.pn)) }
-func (m *message) Address() string            { return 
C.GoString(C.pn_message_get_address(m.pn)) }
-func (m *message) Subject() string            { return 
C.GoString(C.pn_message_get_subject(m.pn)) }
-func (m *message) ReplyTo() string            { return 
C.GoString(C.pn_message_get_reply_to(m.pn)) }
-func (m *message) CorrelationId() interface{} { return 
rewindGet(C.pn_message_correlation_id(m.pn)) }
-func (m *message) ContentType() string        { return 
C.GoString(C.pn_message_get_content_type(m.pn)) }
-func (m *message) ContentEncoding() string    { return 
C.GoString(C.pn_message_get_content_encoding(m.pn)) }
 
-func (m *message) ExpiryTime() time.Time {
-       return time.Unix(0, 
int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
-}
-func (m *message) CreationTime() time.Time {
-       return time.Unix(0, 
int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
-}
-func (m *message) GroupId() string        { return 
C.GoString(C.pn_message_get_group_id(m.pn)) }
-func (m *message) GroupSequence() int32   { return 
int32(C.pn_message_get_group_sequence(m.pn)) }
-func (m *message) ReplyToGroupId() string { return 
C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+// Reset message to all default values
+func (m *message) Clear() { *m = message{priority: 4} }
 
-func getAnnotations(data *C.pn_data_t) (v map[AnnotationKey]interface{}) {
-       if C.pn_data_size(data) > 0 {
-               C.pn_data_rewind(data)
-               C.pn_data_next(data)
-               unmarshal(&v, data)
+// Copy makes a deep copy of message x
+func (m *message) Copy(x Message) error {
+       var mc MessageCodec
+       bytes, err := mc.Encode(x, nil)
+       if err == nil {
+               err = mc.Decode(m, bytes)
        }
-       return v
-}
+       return err
+}
+
+type message struct {
+       address               string
+       applicationProperties map[string]interface{}
+       contentEncoding       string
+       contentType           string
+       correlationId         interface{}
+       creationTime          time.Time
+       deliveryAnnotations   map[AnnotationKey]interface{}
+       deliveryCount         uint32
+       durable               bool
+       expiryTime            time.Time
+       firstAcquirer         bool
+       groupId               string
+       groupSequence         int32
+       inferred              bool
+       messageAnnotations    map[AnnotationKey]interface{}
+       messageId             interface{}
+       priority              uint8
+       replyTo               string
+       replyToGroupId        string
+       subject               string
+       ttl                   time.Duration
+       userId                string
+       body                  interface{}
+       // Keep the original data to support Unmarshal to a non-interface{} type
+       // Waste of memory, consider deprecating or making it optional.
+       pnBody *C.pn_data_t
+}
+
+// ==== message get methods
+func (m *message) Body() interface{}          { return m.body }
+func (m *message) Inferred() bool             { return m.inferred }
+func (m *message) Durable() bool              { return m.durable }
+func (m *message) Priority() uint8            { return m.priority }
+func (m *message) TTL() time.Duration         { return m.ttl }
+func (m *message) FirstAcquirer() bool        { return m.firstAcquirer }
+func (m *message) DeliveryCount() uint32      { return m.deliveryCount }
+func (m *message) MessageId() interface{}     { return m.messageId }
+func (m *message) UserId() string             { return m.userId }
+func (m *message) Address() string            { return m.address }
+func (m *message) Subject() string            { return m.subject }
+func (m *message) ReplyTo() string            { return m.replyTo }
+func (m *message) CorrelationId() interface{} { return m.correlationId }
+func (m *message) ContentType() string        { return m.contentType }
+func (m *message) ContentEncoding() string    { return m.contentEncoding }
+func (m *message) ExpiryTime() time.Time      { return m.expiryTime }
+func (m *message) CreationTime() time.Time    { return m.creationTime }
+func (m *message) GroupId() string            { return m.groupId }
+func (m *message) GroupSequence() int32       { return m.groupSequence }
+func (m *message) ReplyToGroupId() string     { return m.replyToGroupId }
 
 func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} {
-       return getAnnotations(C.pn_message_instructions(m.pn))
+       if m.deliveryAnnotations == nil {
+               m.deliveryAnnotations = make(map[AnnotationKey]interface{})
+       }
+       return m.deliveryAnnotations
 }
 func (m *message) MessageAnnotations() map[AnnotationKey]interface{} {
-       return getAnnotations(C.pn_message_annotations(m.pn))
+       if m.messageAnnotations == nil {
+               m.messageAnnotations = make(map[AnnotationKey]interface{})
+       }
+       return m.messageAnnotations
 }
-
 func (m *message) ApplicationProperties() map[string]interface{} {
-       var v map[string]interface{}
-       data := C.pn_message_properties(m.pn)
-       if C.pn_data_size(data) > 0 {
-               C.pn_data_rewind(data)
-               C.pn_data_next(data)
-               unmarshal(&v, data)
+       if m.applicationProperties == nil {
+               m.applicationProperties = make(map[string]interface{})
        }
-       return v
+       return m.applicationProperties
 }
 
 // ==== message set methods
 
-func setData(v interface{}, data *C.pn_data_t) {
-       C.pn_data_clear(data)
-       marshal(v, data)
-}
+func (m *message) SetBody(v interface{})          { m.body = v }
+func (m *message) SetInferred(x bool)             { m.inferred = x }
+func (m *message) SetDurable(x bool)              { m.durable = x }
+func (m *message) SetPriority(x uint8)            { m.priority = x }
+func (m *message) SetTTL(x time.Duration)         { m.ttl = x }
+func (m *message) SetFirstAcquirer(x bool)        { m.firstAcquirer = x }
+func (m *message) SetDeliveryCount(x uint32)      { m.deliveryCount = x }
+func (m *message) SetMessageId(x interface{})     { m.messageId = x }
+func (m *message) SetUserId(x string)             { m.userId = x }
+func (m *message) SetAddress(x string)            { m.address = x }
+func (m *message) SetSubject(x string)            { m.subject = x }
+func (m *message) SetReplyTo(x string)            { m.replyTo = x }
+func (m *message) SetCorrelationId(x interface{}) { m.correlationId = x }
+func (m *message) SetContentType(x string)        { m.contentType = x }
+func (m *message) SetContentEncoding(x string)    { m.contentEncoding = x }
+func (m *message) SetExpiryTime(x time.Time)      { m.expiryTime = x }
+func (m *message) SetCreationTime(x time.Time)    { m.creationTime = x }
+func (m *message) SetGroupId(x string)            { m.groupId = x }
+func (m *message) SetGroupSequence(x int32)       { m.groupSequence = x }
+func (m *message) SetReplyToGroupId(x string)     { m.replyToGroupId = x }
+
+func (m *message) SetDeliveryAnnotations(x map[AnnotationKey]interface{}) {
+       m.deliveryAnnotations = x
+}
+func (m *message) SetMessageAnnotations(x map[AnnotationKey]interface{}) {
+       m.messageAnnotations = x
+}
+func (m *message) SetApplicationProperties(x map[string]interface{}) {
+       m.applicationProperties = x
+}
+
+// Marshal body from v, same as SetBody(v). See amqp.Marshal.
+func (m *message) Marshal(v interface{}) { m.body = v }
 
-func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, 
C.bool(b)) }
-func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, 
C.bool(b)) }
-func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, 
C.uint8_t(b)) }
-func (m *message) SetTTL(d time.Duration) {
-       C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
-}
-func (m *message) SetFirstAcquirer(b bool)     { 
C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
-func (m *message) SetDeliveryCount(c uint32)   { 
C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
-func (m *message) SetMessageId(id interface{}) { setData(id, 
C.pn_message_id(m.pn)) }
-func (m *message) SetUserId(s string)          { 
C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
-func (m *message) SetAddress(s string) {
-       C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
-}
-func (m *message) SetSubject(s string) {
-       C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
-}
-func (m *message) SetReplyTo(s string) {
-       C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
-}
-func (m *message) SetCorrelationId(c interface{}) { setData(c, 
C.pn_message_correlation_id(m.pn)) }
-func (m *message) SetContentType(s string) {
-       C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_content_type))
-}
-func (m *message) SetContentEncoding(s string) {
-       C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_content_encoding))
-}
-func (m *message) SetExpiryTime(t time.Time)   { 
C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
-func (m *message) SetCreationTime(t time.Time) { 
C.pn_message_set_creation_time(m.pn, pnTime(t)) }
-func (m *message) SetGroupId(s string) {
-       C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
-}
-func (m *message) SetGroupSequence(s int32) {
-       C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
-}
-func (m *message) SetReplyToGroupId(s string) {
-       C.msg_set_str(m.pn, C.CString(s), 
C.set_fn(C.pn_message_set_reply_to_group_id))
+func (m *message) Unmarshal(v interface{}) {
+       // FIXME aconway 2018-09-28: this is inefficient, replace with a
+       // reflective conversion from the existing body value that respects
+       // the Unmarshal() rules.
+       pnData := C.pn_data(2)
+       marshal(m.body, pnData)
+       unmarshal(v, pnData)
 }
 
-func (m *message) SetDeliveryAnnotations(v map[AnnotationKey]interface{}) {
-       setData(v, C.pn_message_instructions(m.pn))
-}
-func (m *message) SetMessageAnnotations(v map[AnnotationKey]interface{}) {
-       setData(v, C.pn_message_annotations(m.pn))
+// Internal use only
+type MessageCodec struct {
+       pn *C.pn_message_t // Cache a pn_message_t to speed up encode/decode
+       // Optionally remember a byte buffer to use with MessageCodec methods.
+       Buffer []byte
 }
-func (m *message) SetApplicationProperties(v map[string]interface{}) {
-       setData(v, C.pn_message_properties(m.pn))
-}
-
-// Marshal body from v
-func (m *message) Marshal(v interface{}) { clearMarshal(v, 
C.pn_message_body(m.pn)) }
 
-// Unmarshal body to v, which must be a pointer to a value. See amqp.Unmarshal
-func (m *message) Unmarshal(v interface{}) {
-       data := C.pn_message_body(m.pn)
-       if C.pn_data_size(data) > 0 {
-               C.pn_data_rewind(data)
-               C.pn_data_next(data)
-               unmarshal(v, data)
+func (mc *MessageCodec) pnMessage() *C.pn_message_t {
+       if mc.pn == nil {
+               mc.pn = C.pn_message()
        }
-       return
+       return mc.pn
 }
 
-// Return the body value as an interface
-func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
-
-func (m *message) Decode(data []byte) error {
-       m.Clear()
-       if len(data) == 0 {
-               return fmt.Errorf("empty buffer for decode")
+func (mc *MessageCodec) Close() {
+       if mc.pn != nil {
+               C.pn_message_free(mc.pn)
+               mc.pn = nil
        }
-       if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
-               return fmt.Errorf("decoding message: %s", 
PnError(C.pn_message_error(m.pn)))
+}
+
+func (mc *MessageCodec) Decode(m Message, data []byte) error {
+       pn := mc.pnMessage()
+       if C.pn_message_decode(pn, cPtr(data), cLen(data)) < 0 {
+               return fmt.Errorf("decoding message: %s", 
PnError(C.pn_message_error(pn)))
        }
+       m.(*message).get(pn)
        return nil
 }
 
+func (m *message) Decode(data []byte) error {
+       var mc MessageCodec
+       defer mc.Close()
+       return mc.Decode(m, data)
+}
+
 func DecodeMessage(data []byte) (m Message, err error) {
        m = NewMessage()
        err = m.Decode(data)
        return
 }
 
-func (m *message) Encode(buffer []byte) ([]byte, error) {
+// Encode m using buffer. Return the final buffer used to hold m,
+// may be different if the initial buffer was not large enough.
+func (mc *MessageCodec) Encode(m Message, buffer []byte) ([]byte, error) {
+       pn := mc.pnMessage()
+       m.(*message).put(pn)
        encode := func(buf []byte) ([]byte, error) {
                len := cLen(buf)
-               result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+               result := C.pn_message_encode(pn, cPtr(buf), &len)
                switch {
                case result == C.PN_OVERFLOW:
                        return buf, overflow
@@ -380,50 +395,214 @@ func (m *message) Encode(buffer []byte) ([]byte, error) {
        return encodeGrow(buffer, encode)
 }
 
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+       var mc MessageCodec
+       defer mc.Close()
+       return mc.Encode(m, buffer)
+}
+
 // TODO aconway 2015-09-14: Multi-section messages.
 
-func (m *message) String() string {
-       str := C.pn_string(C.CString(""))
-       defer C.pn_free(unsafe.Pointer(str))
-       C.pn_inspect(unsafe.Pointer(m.pn), str)
-       return C.GoString(C.pn_string_get(str))
+type ignoreFunc func(v interface{}) bool
+
+func isNil(v interface{}) bool   { return v == nil }
+func isZero(v interface{}) bool  { return v == 
reflect.Zero(reflect.TypeOf(v)).Interface() }
+func isEmpty(v interface{}) bool { return reflect.ValueOf(v).Len() == 0 }
+
+type stringBuilder struct {
+       bytes.Buffer
+       separator string
 }
 
-// ==== Deprecated functions
-func oldGetAnnotations(data *C.pn_data_t) (v map[string]interface{}) {
-       if C.pn_data_size(data) > 0 {
+func (b *stringBuilder) field(name string, value interface{}, ignore 
ignoreFunc) {
+       if !ignore(value) {
+               b.WriteString(b.separator)
+               b.separator = ", "
+               b.WriteString(name)
+               b.WriteString(": ")
+               fmt.Fprintf(&b.Buffer, "%v", value)
+       }
+}
+
+// Human-readable string describing message.
+// Includes only message fields with non-default values.
+func (m *message) String() string {
+       var b stringBuilder
+       b.WriteString("Message{")
+       b.field("address", m.address, isEmpty)
+       b.field("durable", m.durable, isZero)
+       // Priority has weird default
+       b.field("priority", m.priority, func(v interface{}) bool { return 
v.(uint8) == 4 })
+       b.field("ttl", m.ttl, isZero)
+       b.field("first-acquirer", m.firstAcquirer, isZero)
+       b.field("delivery-count", m.deliveryCount, isZero)
+       b.field("message-id", m.messageId, isNil)
+       b.field("user-id", m.userId, isEmpty)
+       b.field("subject", m.subject, isEmpty)
+       b.field("reply-to", m.replyTo, isEmpty)
+       b.field("correlation-id", m.correlationId, isNil)
+       b.field("content-type", m.contentType, isEmpty)
+       b.field("content-encoding", m.contentEncoding, isEmpty)
+       b.field("expiry-time", m.expiryTime, isZero)
+       b.field("creation-time", m.creationTime, isZero)
+       b.field("group-id", m.groupId, isEmpty)
+       b.field("group-sequence", m.groupSequence, isZero)
+       b.field("reply-to-group-id", m.replyToGroupId, isEmpty)
+       b.field("inferred", m.inferred, isZero)
+       b.field("delivery-annotations", m.deliveryAnnotations, isEmpty)
+       b.field("message-annotations", m.messageAnnotations, isEmpty)
+       b.field("application-properties", m.applicationProperties, isEmpty)
+       b.field("body", m.body, isNil)
+       b.WriteString("}")
+       return b.String()
+}
+
+// ==== get message from pn_message_t
+
+func getData(v interface{}, data *C.pn_data_t) {
+       if data != nil && C.pn_data_size(data) > 0 {
                C.pn_data_rewind(data)
                C.pn_data_next(data)
-               unmarshal(&v, data)
+               unmarshal(v, data)
+       }
+       return
+}
+
+func getString(c *C.char) string {
+       if c == nil {
+               return ""
+       }
+       return C.GoString(c)
+}
+
+func (m *message) get(pn *C.pn_message_t) {
+       m.Clear()
+       m.inferred = bool(C.pn_message_is_inferred(pn))
+       m.durable = bool(C.pn_message_is_durable(pn))
+       m.priority = uint8(C.pn_message_get_priority(pn))
+       m.ttl = goDuration(C.pn_message_get_ttl(pn))
+       m.firstAcquirer = bool(C.pn_message_is_first_acquirer(pn))
+       m.deliveryCount = uint32(C.pn_message_get_delivery_count(pn))
+       getData(&m.messageId, C.pn_message_id(pn))
+       m.userId = string(goBytes(C.pn_message_get_user_id(pn)))
+       m.address = getString(C.pn_message_get_address(pn))
+       m.subject = getString(C.pn_message_get_subject(pn))
+       m.replyTo = getString(C.pn_message_get_reply_to(pn))
+       getData(&m.correlationId, C.pn_message_correlation_id(pn))
+       m.contentType = getString(C.pn_message_get_content_type(pn))
+       m.contentEncoding = getString(C.pn_message_get_content_encoding(pn))
+       m.expiryTime = goTime(C.pn_message_get_expiry_time(pn))
+       m.creationTime = goTime(C.pn_message_get_creation_time(pn))
+       m.groupId = getString(C.pn_message_get_group_id(pn))
+       m.groupSequence = int32(C.pn_message_get_group_sequence(pn))
+       m.replyToGroupId = getString(C.pn_message_get_reply_to_group_id(pn))
+       getData(&m.deliveryAnnotations, C.pn_message_instructions(pn))
+       getData(&m.messageAnnotations, C.pn_message_annotations(pn))
+       getData(&m.applicationProperties, C.pn_message_properties(pn))
+       getData(&m.body, C.pn_message_body(pn))
+}
+
+// ==== put message to pn_message_t
+
+func putData(v interface{}, pn *C.pn_data_t) {
+       if v != nil {
+               C.pn_data_clear(pn)
+               marshal(v, pn)
+       }
+}
+
+// For pointer-based fields (pn_data_t, strings, bytes) only
+// put a field if it has a non-empty value
+func (m *message) put(pn *C.pn_message_t) {
+       C.pn_message_clear(pn)
+       C.pn_message_set_inferred(pn, C.bool(m.inferred))
+       C.pn_message_set_durable(pn, C.bool(m.durable))
+       C.pn_message_set_priority(pn, C.uint8_t(m.priority))
+       C.pn_message_set_ttl(pn, pnDuration(m.ttl))
+       C.pn_message_set_first_acquirer(pn, C.bool(m.firstAcquirer))
+       C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount))
+       putData(m.messageId, C.pn_message_id(pn))
+       if m.userId != "" {
+               C.pn_message_set_user_id(pn, pnBytes(([]byte)(m.userId)))
+       }
+       if m.address != "" {
+               C.pn_message_set_address(pn, C.CString(m.address))
+       }
+       if m.subject != "" {
+               C.pn_message_set_subject(pn, C.CString(m.subject))
+       }
+       if m.replyTo != "" {
+               C.pn_message_set_reply_to(pn, C.CString(m.replyTo))
+       }
+       putData(m.correlationId, C.pn_message_correlation_id(pn))
+       if m.contentType != "" {
+               C.pn_message_set_content_type(pn, C.CString(m.contentType))
+       }
+       if m.contentEncoding != "" {
+               C.pn_message_set_content_encoding(pn, 
C.CString(m.contentEncoding))
+       }
+       C.pn_message_set_expiry_time(pn, pnTime(m.expiryTime))
+       C.pn_message_set_creation_time(pn, pnTime(m.creationTime))
+       if m.groupId != "" {
+               C.pn_message_set_group_id(pn, C.CString(m.groupId))
        }
-       return v
+       C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.groupSequence))
+       if m.replyToGroupId != "" {
+               C.pn_message_set_reply_to_group_id(pn, 
C.CString(m.replyToGroupId))
+       }
+       if len(m.deliveryAnnotations) != 0 {
+               putData(m.deliveryAnnotations, C.pn_message_instructions(pn))
+       }
+       if len(m.messageAnnotations) != 0 {
+               putData(m.messageAnnotations, C.pn_message_annotations(pn))
+       }
+       if len(m.applicationProperties) != 0 {
+               putData(m.applicationProperties, C.pn_message_properties(pn))
+       }
+       putData(m.body, C.pn_message_body(pn))
+}
+
+// ==== Deprecated functions
+
+func oldAnnotations(in map[AnnotationKey]interface{}) (out 
map[string]interface{}) {
+       if len(in) == 0 {
+               return nil
+       }
+       out = make(map[string]interface{})
+       for k, v := range in {
+               out[k.String()] = v
+       }
+       return
 }
 
 func (m *message) Instructions() map[string]interface{} {
-       return oldGetAnnotations(C.pn_message_instructions(m.pn))
+       return oldAnnotations(m.deliveryAnnotations)
 }
 func (m *message) Annotations() map[string]interface{} {
-       return oldGetAnnotations(C.pn_message_annotations(m.pn))
+       return oldAnnotations(m.messageAnnotations)
 }
 func (m *message) Properties() map[string]interface{} {
-       return oldGetAnnotations(C.pn_message_properties(m.pn))
+       return m.applicationProperties
 }
 
 // Convert old string-keyed annotations to an AnnotationKey map
-func fixAnnotations(old map[string]interface{}) (annotations 
map[AnnotationKey]interface{}) {
-       annotations = make(map[AnnotationKey]interface{})
-       for k, v := range old {
-               annotations[AnnotationKeyString(k)] = v
+func newAnnotations(in map[string]interface{}) (out 
map[AnnotationKey]interface{}) {
+       if len(in) == 0 {
+               return nil
+       }
+       out = make(map[AnnotationKey]interface{})
+       for k, v := range in {
+               out[AnnotationKeyString(k)] = v
        }
        return
 }
 
 func (m *message) SetInstructions(v map[string]interface{}) {
-       setData(fixAnnotations(v), C.pn_message_instructions(m.pn))
+       m.deliveryAnnotations = newAnnotations(v)
 }
 func (m *message) SetAnnotations(v map[string]interface{}) {
-       setData(fixAnnotations(v), C.pn_message_annotations(m.pn))
+       m.messageAnnotations = newAnnotations(v)
 }
 func (m *message) SetProperties(v map[string]interface{}) {
-       setData(fixAnnotations(v), C.pn_message_properties(m.pn))
+       m.applicationProperties = v
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go 
b/go/src/qpid.apache.org/amqp/message_test.go
index b22c60d..668ca49 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -58,14 +58,14 @@ func TestDefaultMessage(t *testing.T) {
                {"ReplyToGroupId", ""},
                {"MessageId", nil},
                {"CorrelationId", nil},
-               {"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
-               {"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
-               {"ApplicationProperties", map[string]interface{}(nil)},
+               {"DeliveryAnnotations", map[AnnotationKey]interface{}{}},
+               {"MessageAnnotations", map[AnnotationKey]interface{}{}},
+               {"ApplicationProperties", map[string]interface{}{}},
 
                // Deprecated
                {"Instructions", map[string]interface{}(nil)},
                {"Annotations", map[string]interface{}(nil)},
-               {"Properties", map[string]interface{}(nil)},
+               {"Properties", map[string]interface{}{}},
                {"Body", nil},
        } {
                ret := mv.MethodByName(x.method).Call(nil)
@@ -91,7 +91,7 @@ func TestMessageString(t *testing.T) {
        if err := roundTrip(m); err != nil {
                t.Error(err)
        }
-       msgstr := `Message{user_id="user", instructions={:instructions="foo"}, 
annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
+       msgstr := "Message{user-id: user, delivery-annotations: 
map[instructions:foo], message-annotations: map[annotations:bar], 
application-properties: map[int:32], body: hello}"
        if err := checkEqual(msgstr, m.String()); err != nil {
                t.Error(err)
        }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to