PROTON-1910: [go] native Message implementation
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/886d2b93 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/886d2b93 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/886d2b93 Branch: refs/heads/master Commit: 886d2b9349f74c02219b29990ee04278124f5224 Parents: ef716fa Author: Alan Conway <acon...@redhat.com> Authored: Thu Oct 11 15:20:23 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Thu Oct 11 15:20:23 2018 -0400 ---------------------------------------------------------------------- go/src/qpid.apache.org/amqp/message.go | 521 +++++++++++++++-------- go/src/qpid.apache.org/amqp/message_test.go | 10 +- 2 files changed, 355 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/amqp/message.go b/go/src/qpid.apache.org/amqp/message.go index e514b26..919904c 100644 --- a/go/src/qpid.apache.org/amqp/message.go +++ b/go/src/qpid.apache.org/amqp/message.go @@ -35,10 +35,10 @@ package amqp import "C" import ( + "bytes" "fmt" - "runtime" + "reflect" "time" - "unsafe" ) // Message is the interface to an AMQP message. @@ -124,11 +124,12 @@ type Message interface { // Per-delivery annotations to provide delivery instructions. // May be added or removed by intermediaries during delivery. + // See ApplicationProperties() for properties set by the application. DeliveryAnnotations() map[AnnotationKey]interface{} SetDeliveryAnnotations(map[AnnotationKey]interface{}) // Message annotations added as part of the bare message at creation, usually - // by an AMQP library. See ApplicationProperties() for adding application data. + // by an AMQP library. See ApplicationProperties() for properties set by the application. MessageAnnotations() map[AnnotationKey]interface{} SetMessageAnnotations(map[AnnotationKey]interface{}) @@ -141,15 +142,18 @@ type Message interface { Inferred() bool SetInferred(bool) - // Marshal a Go value into the message body. See amqp.Marshal() for details. + // Get the message body, using the amqp.Unmarshal() rules for interface{} + Body() interface{} + + // Set the body using amqp.Marshal() + SetBody(interface{}) + + // Marshal a Go value into the message body, synonym for SetBody() Marshal(interface{}) - // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details. + // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() Unmarshal(interface{}) - // Body value resulting from the default unmarshaling of message body as interface{} - Body() interface{} - // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough // the message is encoded into it, otherwise a new buffer is created. // Returns the buffer containing the message. @@ -158,7 +162,7 @@ type Message interface { // Decode data into this message. Overwrites an existing message content. Decode(buffer []byte) error - // Clear the message contents. + // Clear the message contents, set all fields to the default value. Clear() // Copy the contents of another message to this one. @@ -180,194 +184,205 @@ type Message interface { String() string } -type message struct{ pn *C.pn_message_t } - -func freeMessage(m *message) { - C.pn_message_free(m.pn) - m.pn = nil -} - // NewMessage creates a new message instance. func NewMessage() Message { - m := &message{C.pn_message()} - runtime.SetFinalizer(m, freeMessage) + m := &message{} + m.Clear() return m } -// NewMessageWith creates a message with value as the body. Equivalent to -// m := NewMessage(); m.Marshal(body) +// NewMessageWith creates a message with value as the body. func NewMessageWith(value interface{}) Message { m := NewMessage() - m.Marshal(value) + m.SetBody(value) return m } -func (m *message) Clear() { C.pn_message_clear(m.pn) } - -func (m *message) Copy(x Message) error { - if data, err := x.Encode(nil); err == nil { - return m.Decode(data) - } else { - return err - } -} - -// ==== message get functions - -func rewindGet(data *C.pn_data_t) (v interface{}) { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(&v, data) - return v -} - -func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) } -func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) } -func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) } -func (m *message) TTL() time.Duration { - return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond +// NewMessageCopy creates a copy of an existing message. +func NewMessageCopy(m Message) Message { + m2 := NewMessage() + m2.Copy(m) + return m2 } -func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) } -func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) } -func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) } -func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) } -func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) } -func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) } -func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) } -func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) } -func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) } -func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) } -func (m *message) ExpiryTime() time.Time { - return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn)))) -} -func (m *message) CreationTime() time.Time { - return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn))) -} -func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) } -func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) } -func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) } +// Reset message to all default values +func (m *message) Clear() { *m = message{priority: 4} } -func getAnnotations(data *C.pn_data_t) (v map[AnnotationKey]interface{}) { - if C.pn_data_size(data) > 0 { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(&v, data) +// Copy makes a deep copy of message x +func (m *message) Copy(x Message) error { + var mc MessageCodec + bytes, err := mc.Encode(x, nil) + if err == nil { + err = mc.Decode(m, bytes) } - return v -} + return err +} + +type message struct { + address string + applicationProperties map[string]interface{} + contentEncoding string + contentType string + correlationId interface{} + creationTime time.Time + deliveryAnnotations map[AnnotationKey]interface{} + deliveryCount uint32 + durable bool + expiryTime time.Time + firstAcquirer bool + groupId string + groupSequence int32 + inferred bool + messageAnnotations map[AnnotationKey]interface{} + messageId interface{} + priority uint8 + replyTo string + replyToGroupId string + subject string + ttl time.Duration + userId string + body interface{} + // Keep the original data to support Unmarshal to a non-interface{} type + // Waste of memory, consider deprecating or making it optional. + pnBody *C.pn_data_t +} + +// ==== message get methods +func (m *message) Body() interface{} { return m.body } +func (m *message) Inferred() bool { return m.inferred } +func (m *message) Durable() bool { return m.durable } +func (m *message) Priority() uint8 { return m.priority } +func (m *message) TTL() time.Duration { return m.ttl } +func (m *message) FirstAcquirer() bool { return m.firstAcquirer } +func (m *message) DeliveryCount() uint32 { return m.deliveryCount } +func (m *message) MessageId() interface{} { return m.messageId } +func (m *message) UserId() string { return m.userId } +func (m *message) Address() string { return m.address } +func (m *message) Subject() string { return m.subject } +func (m *message) ReplyTo() string { return m.replyTo } +func (m *message) CorrelationId() interface{} { return m.correlationId } +func (m *message) ContentType() string { return m.contentType } +func (m *message) ContentEncoding() string { return m.contentEncoding } +func (m *message) ExpiryTime() time.Time { return m.expiryTime } +func (m *message) CreationTime() time.Time { return m.creationTime } +func (m *message) GroupId() string { return m.groupId } +func (m *message) GroupSequence() int32 { return m.groupSequence } +func (m *message) ReplyToGroupId() string { return m.replyToGroupId } func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} { - return getAnnotations(C.pn_message_instructions(m.pn)) + if m.deliveryAnnotations == nil { + m.deliveryAnnotations = make(map[AnnotationKey]interface{}) + } + return m.deliveryAnnotations } func (m *message) MessageAnnotations() map[AnnotationKey]interface{} { - return getAnnotations(C.pn_message_annotations(m.pn)) + if m.messageAnnotations == nil { + m.messageAnnotations = make(map[AnnotationKey]interface{}) + } + return m.messageAnnotations } - func (m *message) ApplicationProperties() map[string]interface{} { - var v map[string]interface{} - data := C.pn_message_properties(m.pn) - if C.pn_data_size(data) > 0 { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(&v, data) + if m.applicationProperties == nil { + m.applicationProperties = make(map[string]interface{}) } - return v + return m.applicationProperties } // ==== message set methods -func setData(v interface{}, data *C.pn_data_t) { - C.pn_data_clear(data) - marshal(v, data) -} +func (m *message) SetBody(v interface{}) { m.body = v } +func (m *message) SetInferred(x bool) { m.inferred = x } +func (m *message) SetDurable(x bool) { m.durable = x } +func (m *message) SetPriority(x uint8) { m.priority = x } +func (m *message) SetTTL(x time.Duration) { m.ttl = x } +func (m *message) SetFirstAcquirer(x bool) { m.firstAcquirer = x } +func (m *message) SetDeliveryCount(x uint32) { m.deliveryCount = x } +func (m *message) SetMessageId(x interface{}) { m.messageId = x } +func (m *message) SetUserId(x string) { m.userId = x } +func (m *message) SetAddress(x string) { m.address = x } +func (m *message) SetSubject(x string) { m.subject = x } +func (m *message) SetReplyTo(x string) { m.replyTo = x } +func (m *message) SetCorrelationId(x interface{}) { m.correlationId = x } +func (m *message) SetContentType(x string) { m.contentType = x } +func (m *message) SetContentEncoding(x string) { m.contentEncoding = x } +func (m *message) SetExpiryTime(x time.Time) { m.expiryTime = x } +func (m *message) SetCreationTime(x time.Time) { m.creationTime = x } +func (m *message) SetGroupId(x string) { m.groupId = x } +func (m *message) SetGroupSequence(x int32) { m.groupSequence = x } +func (m *message) SetReplyToGroupId(x string) { m.replyToGroupId = x } + +func (m *message) SetDeliveryAnnotations(x map[AnnotationKey]interface{}) { + m.deliveryAnnotations = x +} +func (m *message) SetMessageAnnotations(x map[AnnotationKey]interface{}) { + m.messageAnnotations = x +} +func (m *message) SetApplicationProperties(x map[string]interface{}) { + m.applicationProperties = x +} + +// Marshal body from v, same as SetBody(v). See amqp.Marshal. +func (m *message) Marshal(v interface{}) { m.body = v } -func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) } -func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) } -func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) } -func (m *message) SetTTL(d time.Duration) { - C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond)) -} -func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) } -func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) } -func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) } -func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) } -func (m *message) SetAddress(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address)) -} -func (m *message) SetSubject(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject)) -} -func (m *message) SetReplyTo(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to)) -} -func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) } -func (m *message) SetContentType(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type)) -} -func (m *message) SetContentEncoding(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding)) -} -func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) } -func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) } -func (m *message) SetGroupId(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id)) -} -func (m *message) SetGroupSequence(s int32) { - C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s)) -} -func (m *message) SetReplyToGroupId(s string) { - C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id)) +func (m *message) Unmarshal(v interface{}) { + // FIXME aconway 2018-09-28: this is inefficient, replace with a + // reflective conversion from the existing body value that respects + // the Unmarshal() rules. + pnData := C.pn_data(2) + marshal(m.body, pnData) + unmarshal(v, pnData) } -func (m *message) SetDeliveryAnnotations(v map[AnnotationKey]interface{}) { - setData(v, C.pn_message_instructions(m.pn)) -} -func (m *message) SetMessageAnnotations(v map[AnnotationKey]interface{}) { - setData(v, C.pn_message_annotations(m.pn)) +// Internal use only +type MessageCodec struct { + pn *C.pn_message_t // Cache a pn_message_t to speed up encode/decode + // Optionally remember a byte buffer to use with MessageCodec methods. + Buffer []byte } -func (m *message) SetApplicationProperties(v map[string]interface{}) { - setData(v, C.pn_message_properties(m.pn)) -} - -// Marshal body from v -func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) } -// Unmarshal body to v, which must be a pointer to a value. See amqp.Unmarshal -func (m *message) Unmarshal(v interface{}) { - data := C.pn_message_body(m.pn) - if C.pn_data_size(data) > 0 { - C.pn_data_rewind(data) - C.pn_data_next(data) - unmarshal(v, data) +func (mc *MessageCodec) pnMessage() *C.pn_message_t { + if mc.pn == nil { + mc.pn = C.pn_message() } - return + return mc.pn } -// Return the body value as an interface -func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return } - -func (m *message) Decode(data []byte) error { - m.Clear() - if len(data) == 0 { - return fmt.Errorf("empty buffer for decode") +func (mc *MessageCodec) Close() { + if mc.pn != nil { + C.pn_message_free(mc.pn) + mc.pn = nil } - if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 { - return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn))) +} + +func (mc *MessageCodec) Decode(m Message, data []byte) error { + pn := mc.pnMessage() + if C.pn_message_decode(pn, cPtr(data), cLen(data)) < 0 { + return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(pn))) } + m.(*message).get(pn) return nil } +func (m *message) Decode(data []byte) error { + var mc MessageCodec + defer mc.Close() + return mc.Decode(m, data) +} + func DecodeMessage(data []byte) (m Message, err error) { m = NewMessage() err = m.Decode(data) return } -func (m *message) Encode(buffer []byte) ([]byte, error) { +// Encode m using buffer. Return the final buffer used to hold m, +// may be different if the initial buffer was not large enough. +func (mc *MessageCodec) Encode(m Message, buffer []byte) ([]byte, error) { + pn := mc.pnMessage() + m.(*message).put(pn) encode := func(buf []byte) ([]byte, error) { len := cLen(buf) - result := C.pn_message_encode(m.pn, cPtr(buf), &len) + result := C.pn_message_encode(pn, cPtr(buf), &len) switch { case result == C.PN_OVERFLOW: return buf, overflow @@ -380,50 +395,214 @@ func (m *message) Encode(buffer []byte) ([]byte, error) { return encodeGrow(buffer, encode) } +func (m *message) Encode(buffer []byte) ([]byte, error) { + var mc MessageCodec + defer mc.Close() + return mc.Encode(m, buffer) +} + // TODO aconway 2015-09-14: Multi-section messages. -func (m *message) String() string { - str := C.pn_string(C.CString("")) - defer C.pn_free(unsafe.Pointer(str)) - C.pn_inspect(unsafe.Pointer(m.pn), str) - return C.GoString(C.pn_string_get(str)) +type ignoreFunc func(v interface{}) bool + +func isNil(v interface{}) bool { return v == nil } +func isZero(v interface{}) bool { return v == reflect.Zero(reflect.TypeOf(v)).Interface() } +func isEmpty(v interface{}) bool { return reflect.ValueOf(v).Len() == 0 } + +type stringBuilder struct { + bytes.Buffer + separator string } -// ==== Deprecated functions -func oldGetAnnotations(data *C.pn_data_t) (v map[string]interface{}) { - if C.pn_data_size(data) > 0 { +func (b *stringBuilder) field(name string, value interface{}, ignore ignoreFunc) { + if !ignore(value) { + b.WriteString(b.separator) + b.separator = ", " + b.WriteString(name) + b.WriteString(": ") + fmt.Fprintf(&b.Buffer, "%v", value) + } +} + +// Human-readable string describing message. +// Includes only message fields with non-default values. +func (m *message) String() string { + var b stringBuilder + b.WriteString("Message{") + b.field("address", m.address, isEmpty) + b.field("durable", m.durable, isZero) + // Priority has weird default + b.field("priority", m.priority, func(v interface{}) bool { return v.(uint8) == 4 }) + b.field("ttl", m.ttl, isZero) + b.field("first-acquirer", m.firstAcquirer, isZero) + b.field("delivery-count", m.deliveryCount, isZero) + b.field("message-id", m.messageId, isNil) + b.field("user-id", m.userId, isEmpty) + b.field("subject", m.subject, isEmpty) + b.field("reply-to", m.replyTo, isEmpty) + b.field("correlation-id", m.correlationId, isNil) + b.field("content-type", m.contentType, isEmpty) + b.field("content-encoding", m.contentEncoding, isEmpty) + b.field("expiry-time", m.expiryTime, isZero) + b.field("creation-time", m.creationTime, isZero) + b.field("group-id", m.groupId, isEmpty) + b.field("group-sequence", m.groupSequence, isZero) + b.field("reply-to-group-id", m.replyToGroupId, isEmpty) + b.field("inferred", m.inferred, isZero) + b.field("delivery-annotations", m.deliveryAnnotations, isEmpty) + b.field("message-annotations", m.messageAnnotations, isEmpty) + b.field("application-properties", m.applicationProperties, isEmpty) + b.field("body", m.body, isNil) + b.WriteString("}") + return b.String() +} + +// ==== get message from pn_message_t + +func getData(v interface{}, data *C.pn_data_t) { + if data != nil && C.pn_data_size(data) > 0 { C.pn_data_rewind(data) C.pn_data_next(data) - unmarshal(&v, data) + unmarshal(v, data) + } + return +} + +func getString(c *C.char) string { + if c == nil { + return "" + } + return C.GoString(c) +} + +func (m *message) get(pn *C.pn_message_t) { + m.Clear() + m.inferred = bool(C.pn_message_is_inferred(pn)) + m.durable = bool(C.pn_message_is_durable(pn)) + m.priority = uint8(C.pn_message_get_priority(pn)) + m.ttl = goDuration(C.pn_message_get_ttl(pn)) + m.firstAcquirer = bool(C.pn_message_is_first_acquirer(pn)) + m.deliveryCount = uint32(C.pn_message_get_delivery_count(pn)) + getData(&m.messageId, C.pn_message_id(pn)) + m.userId = string(goBytes(C.pn_message_get_user_id(pn))) + m.address = getString(C.pn_message_get_address(pn)) + m.subject = getString(C.pn_message_get_subject(pn)) + m.replyTo = getString(C.pn_message_get_reply_to(pn)) + getData(&m.correlationId, C.pn_message_correlation_id(pn)) + m.contentType = getString(C.pn_message_get_content_type(pn)) + m.contentEncoding = getString(C.pn_message_get_content_encoding(pn)) + m.expiryTime = goTime(C.pn_message_get_expiry_time(pn)) + m.creationTime = goTime(C.pn_message_get_creation_time(pn)) + m.groupId = getString(C.pn_message_get_group_id(pn)) + m.groupSequence = int32(C.pn_message_get_group_sequence(pn)) + m.replyToGroupId = getString(C.pn_message_get_reply_to_group_id(pn)) + getData(&m.deliveryAnnotations, C.pn_message_instructions(pn)) + getData(&m.messageAnnotations, C.pn_message_annotations(pn)) + getData(&m.applicationProperties, C.pn_message_properties(pn)) + getData(&m.body, C.pn_message_body(pn)) +} + +// ==== put message to pn_message_t + +func putData(v interface{}, pn *C.pn_data_t) { + if v != nil { + C.pn_data_clear(pn) + marshal(v, pn) + } +} + +// For pointer-based fields (pn_data_t, strings, bytes) only +// put a field if it has a non-empty value +func (m *message) put(pn *C.pn_message_t) { + C.pn_message_clear(pn) + C.pn_message_set_inferred(pn, C.bool(m.inferred)) + C.pn_message_set_durable(pn, C.bool(m.durable)) + C.pn_message_set_priority(pn, C.uint8_t(m.priority)) + C.pn_message_set_ttl(pn, pnDuration(m.ttl)) + C.pn_message_set_first_acquirer(pn, C.bool(m.firstAcquirer)) + C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount)) + putData(m.messageId, C.pn_message_id(pn)) + if m.userId != "" { + C.pn_message_set_user_id(pn, pnBytes(([]byte)(m.userId))) + } + if m.address != "" { + C.pn_message_set_address(pn, C.CString(m.address)) + } + if m.subject != "" { + C.pn_message_set_subject(pn, C.CString(m.subject)) + } + if m.replyTo != "" { + C.pn_message_set_reply_to(pn, C.CString(m.replyTo)) + } + putData(m.correlationId, C.pn_message_correlation_id(pn)) + if m.contentType != "" { + C.pn_message_set_content_type(pn, C.CString(m.contentType)) + } + if m.contentEncoding != "" { + C.pn_message_set_content_encoding(pn, C.CString(m.contentEncoding)) + } + C.pn_message_set_expiry_time(pn, pnTime(m.expiryTime)) + C.pn_message_set_creation_time(pn, pnTime(m.creationTime)) + if m.groupId != "" { + C.pn_message_set_group_id(pn, C.CString(m.groupId)) } - return v + C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.groupSequence)) + if m.replyToGroupId != "" { + C.pn_message_set_reply_to_group_id(pn, C.CString(m.replyToGroupId)) + } + if len(m.deliveryAnnotations) != 0 { + putData(m.deliveryAnnotations, C.pn_message_instructions(pn)) + } + if len(m.messageAnnotations) != 0 { + putData(m.messageAnnotations, C.pn_message_annotations(pn)) + } + if len(m.applicationProperties) != 0 { + putData(m.applicationProperties, C.pn_message_properties(pn)) + } + putData(m.body, C.pn_message_body(pn)) +} + +// ==== Deprecated functions + +func oldAnnotations(in map[AnnotationKey]interface{}) (out map[string]interface{}) { + if len(in) == 0 { + return nil + } + out = make(map[string]interface{}) + for k, v := range in { + out[k.String()] = v + } + return } func (m *message) Instructions() map[string]interface{} { - return oldGetAnnotations(C.pn_message_instructions(m.pn)) + return oldAnnotations(m.deliveryAnnotations) } func (m *message) Annotations() map[string]interface{} { - return oldGetAnnotations(C.pn_message_annotations(m.pn)) + return oldAnnotations(m.messageAnnotations) } func (m *message) Properties() map[string]interface{} { - return oldGetAnnotations(C.pn_message_properties(m.pn)) + return m.applicationProperties } // Convert old string-keyed annotations to an AnnotationKey map -func fixAnnotations(old map[string]interface{}) (annotations map[AnnotationKey]interface{}) { - annotations = make(map[AnnotationKey]interface{}) - for k, v := range old { - annotations[AnnotationKeyString(k)] = v +func newAnnotations(in map[string]interface{}) (out map[AnnotationKey]interface{}) { + if len(in) == 0 { + return nil + } + out = make(map[AnnotationKey]interface{}) + for k, v := range in { + out[AnnotationKeyString(k)] = v } return } func (m *message) SetInstructions(v map[string]interface{}) { - setData(fixAnnotations(v), C.pn_message_instructions(m.pn)) + m.deliveryAnnotations = newAnnotations(v) } func (m *message) SetAnnotations(v map[string]interface{}) { - setData(fixAnnotations(v), C.pn_message_annotations(m.pn)) + m.messageAnnotations = newAnnotations(v) } func (m *message) SetProperties(v map[string]interface{}) { - setData(fixAnnotations(v), C.pn_message_properties(m.pn)) + m.applicationProperties = v } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message_test.go ---------------------------------------------------------------------- diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go index b22c60d..668ca49 100644 --- a/go/src/qpid.apache.org/amqp/message_test.go +++ b/go/src/qpid.apache.org/amqp/message_test.go @@ -58,14 +58,14 @@ func TestDefaultMessage(t *testing.T) { {"ReplyToGroupId", ""}, {"MessageId", nil}, {"CorrelationId", nil}, - {"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)}, - {"MessageAnnotations", map[AnnotationKey]interface{}(nil)}, - {"ApplicationProperties", map[string]interface{}(nil)}, + {"DeliveryAnnotations", map[AnnotationKey]interface{}{}}, + {"MessageAnnotations", map[AnnotationKey]interface{}{}}, + {"ApplicationProperties", map[string]interface{}{}}, // Deprecated {"Instructions", map[string]interface{}(nil)}, {"Annotations", map[string]interface{}(nil)}, - {"Properties", map[string]interface{}(nil)}, + {"Properties", map[string]interface{}{}}, {"Body", nil}, } { ret := mv.MethodByName(x.method).Call(nil) @@ -91,7 +91,7 @@ func TestMessageString(t *testing.T) { if err := roundTrip(m); err != nil { t.Error(err) } - msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}` + msgstr := "Message{user-id: user, delivery-annotations: map[instructions:foo], message-annotations: map[annotations:bar], application-properties: map[int:32], body: hello}" if err := checkEqual(msgstr, m.String()); err != nil { t.Error(err) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org