This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new e1c3822 Expose replicated from filed on message struct (#251) e1c3822 is described below commit e1c38224cd60bc7be197dc55b55f4b8149d035bd Author: 冉小龙 <r...@apache.org> AuthorDate: Sun May 17 23:14:25 2020 +0800 Expose replicated from filed on message struct (#251) Signed-off-by: xiaolong.ran <r...@apache.org> --- pulsar/consumer_partition.go | 2 ++ pulsar/impl_message.go | 9 +++++++++ pulsar/message.go | 6 ++++++ 3 files changed, 17 insertions(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 498f2ae..4c40ae9 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -424,6 +424,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header msgID: msgID, payLoad: payload, replicationClusters: msgMeta.GetReplicateTo(), + replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), } } else { @@ -436,6 +437,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header msgID: msgID, payLoad: payload, replicationClusters: msgMeta.GetReplicateTo(), + replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), } } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 9b85c8a..3a5d4b6 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -145,6 +145,7 @@ type message struct { properties map[string]string topic string replicationClusters []string + replicatedFrom string redeliveryCount uint32 } @@ -180,6 +181,14 @@ func (msg *message) RedeliveryCount() uint32 { return msg.redeliveryCount } +func (msg *message) IsReplicated() bool { + return msg.replicatedFrom != "" +} + +func (msg *message) GetReplicatedFrom() string { + return msg.replicatedFrom +} + func newAckTracker(size int) *ackTracker { var batchIDs *big.Int if size <= 64 { diff --git a/pulsar/message.go b/pulsar/message.go index 8505321..6be35d2 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -93,6 +93,12 @@ type Message interface { // Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker // redelivery count will be recalculated. RedeliveryCount() uint32 + + // Check whether the message is replicated from other cluster. + IsReplicated() bool + + // Get name of cluster, from which the message is replicated. + GetReplicatedFrom() string } // MessageID identifier for a particular message