merlimat closed pull request #2447: Added producer/consumer properties in Go 
client
URL: https://github.com/apache/incubator-pulsar/pull/2447
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 45154c5c6a..565a6ab0ea 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -128,7 +128,7 @@ class ProducerConfiguration {
     ProducerConfiguration& addEncryptionKey(std::string key);
 
     /**
-     * Check whether the message has a specific property attached.
+     * Check whether the producer has a specific property attached.
      *
      * @param name the name of the property to check
      * @return true if the message has the specified property
@@ -150,7 +150,8 @@ class ProducerConfiguration {
     std::map<std::string, std::string>& getProperties() const;
 
     /**
-     * Sets a new property on a message.
+     * Sets a new property on the producer
+     * .
      * @param name   the name of the property
      * @param value  the associated value
      */
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 7299867f38..fca47eb06d 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -152,6 +152,9 @@ int 
pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_
 void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t 
*consumer_configuration,
                                         int compacted);
 
+void 
pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t 
*conf, const char *name,
+                                                const char *value);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader()
 //
 // const;
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index ae88198f9a..670bf50ed8 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -134,6 +134,9 @@ void 
pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_prod
 unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
     pulsar_producer_configuration_t *conf);
 
+void 
pulsar_producer_configuration_set_property(pulsar_producer_configuration_t 
*conf, const char *name,
+                                                const char *value);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader() const;
 // ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr 
cryptoKeyReader);
 //
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index c8d545383b..75cdc47ec0 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -113,3 +113,8 @@ void 
pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consume
                                         int compacted) {
     consumer_configuration->consumerConfiguration.setReadCompacted(compacted);
 }
+
+void 
pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t 
*conf, const char *name,
+                                                const char *value) {
+    conf->consumerConfiguration.setProperty(name, value);
+}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 914fc0ac13..a8eb5be3ec 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -174,3 +174,8 @@ unsigned long 
pulsar_producer_configuration_get_batching_max_publish_delay_ms(
     pulsar_producer_configuration_t *conf) {
     return conf->conf.getBatchingMaxPublishDelayMs();
 }
+
+void 
pulsar_producer_configuration_set_property(pulsar_producer_configuration_t 
*conf, const char *name,
+                                                const char *value) {
+    conf->conf.setProperty(name, value);
+}
diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index 7f613b29d6..1b41a71a49 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,18 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
                C.pulsar_consumer_set_consumer_name(conf, name)
        }
 
+       if options.Properties != nil {
+               for key, value := range options.Properties {
+                       cKey := C.CString(key)
+                       cValue := C.CString(value)
+
+                       C.pulsar_consumer_configuration_set_property(conf, 
cKey, cValue)
+
+                       C.free(unsafe.Pointer(cKey))
+                       C.free(unsafe.Pointer(cValue))
+               }
+       }
+
        C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
 
        subName := C.CString(options.SubscriptionName)
diff --git a/pulsar-client-go/pulsar/c_producer.go 
b/pulsar-client-go/pulsar/c_producer.go
index b4cd2c5e14..284315dbb2 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -124,6 +124,18 @@ func createProducerAsync(client *client, options 
ProducerOptions, callback func(
                C.pulsar_producer_configuration_set_batching_max_messages(conf, 
C.uint(options.BatchingMaxMessages))
        }
 
+       if options.Properties != nil {
+               for key, value := range options.Properties {
+                       cKey := C.CString(key)
+                       cValue := C.CString(value)
+
+                       C.pulsar_producer_configuration_set_property(conf, 
cKey, cValue)
+
+                       C.free(unsafe.Pointer(cKey))
+                       C.free(unsafe.Pointer(cValue))
+               }
+       }
+
        topicName := C.CString(options.Topic)
        defer C.free(unsafe.Pointer(topicName))
 
diff --git a/pulsar-client-go/pulsar/consumer.go 
b/pulsar-client-go/pulsar/consumer.go
index b9f2616c07..030ba1b3a8 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -64,6 +64,10 @@ type ConsumerOptions struct {
        // This argument is required when subscribing
        SubscriptionName string
 
+       // Attach a set of application defined properties to the consumer
+       // This properties will be visible in the topic stats
+       Properties map[string]string
+
        // Set the timeout for unacked messages
        // Message not acknowledged within the give time, will be replayed by 
the broker to the same or a different consumer
        // Default is 0, which means message are not being replayed based on 
ack time
diff --git a/pulsar-client-go/pulsar/producer.go 
b/pulsar-client-go/pulsar/producer.go
index 2cfd141633..46d6dd68ed 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -71,6 +71,10 @@ type ProducerOptions struct {
        // a topic.
        Name string
 
+       // Attach a set of application defined properties to the producer
+       // This properties will be visible in the topic stats
+       Properties map[string]string
+
        // Set the send timeout (default: 30 seconds)
        // If a message is not acknowledged by the server before the 
sendTimeout expires, an error will be reported.
        // Setting the timeout to -1, will set the timeout to infinity, which 
can be useful when using Pulsar's message
diff --git a/pulsar-client-go/pulsar/producer_test.go 
b/pulsar-client-go/pulsar/producer_test.go
index 940be85be1..cfa0bcbd1a 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-       "testing"
-       "fmt"
        "context"
+       "fmt"
+       "testing"
        "time"
 )
 
@@ -77,6 +77,10 @@ func TestProducer(t *testing.T) {
                MaxPendingMessages:      100,
                BlockIfQueueFull:        true,
                CompressionType:         LZ4,
+               Properties: map[string]string{
+                       "my-name": "test",
+                       "key":     "value",
+               },
        })
 
        assertNil(t, err)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to