This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f33b594  support DisableReplication (#543)
f33b594 is described below

commit f33b594034c9433faae78a144983ab3c8f785add
Author: Yuto Furuta <mzq6mft...@gmail.com>
AuthorDate: Thu Jun 24 18:07:47 2021 +0900

    support DisableReplication (#543)
    
    Co-authored-by: Yuto Furuta <yfur...@yahoo-corp.jp>
---
 pulsar/message.go            | 3 +++
 pulsar/producer_partition.go | 5 +++++
 2 files changed, 8 insertions(+)

diff --git a/pulsar/message.go b/pulsar/message.go
index 2a4343f..23dfefb 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -48,6 +48,9 @@ type ProducerMessage struct {
        // ReplicationClusters override the replication clusters for this 
message.
        ReplicationClusters []string
 
+       // Disable the replication for this message
+       DisableReplication bool
+
        // SequenceID set the sequence id to assign to the current message
        SequenceID *int64
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 19aa06e..7dd176c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -411,6 +411,11 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        if !sendAsBatch {
                p.internalFlushCurrentBatch()
        }
+
+       if msg.DisableReplication {
+               msg.ReplicationClusters = []string{"__local__"}
+       }
+
        added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, 
request,
                msg.ReplicationClusters, deliverAt)
        if !added {

Reply via email to