This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a512cc8  Feature/producer properties (#222)
a512cc8 is described below

commit a512cc8482377972b4932d84bb281d1398e006e1
Author: Kristian Andersen <kanderse...@users.noreply.github.com>
AuthorDate: Fri Jun 7 10:33:45 2024 +0200

    Feature/producer properties (#222)
    
    * Add option to setup producer properties
    
    * Add changelog entry
---
 CHANGELOG.md                                     |  6 ++++++
 src/DotPulsar/Abstractions/IProducerBuilder.cs   |  5 +++++
 src/DotPulsar/Internal/Producer.cs               |  3 ++-
 src/DotPulsar/Internal/ProducerBuilder.cs        | 11 ++++++++++-
 src/DotPulsar/Internal/ProducerChannelFactory.cs |  6 +++++-
 src/DotPulsar/ProducerOptions.cs                 |  6 ++++++
 6 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index aae57f0..0186af3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this 
file.
 
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [3.3.0] - ?
+
+### Added
+
+- Producer properties can be added when creating a producer
+
 ## [3.2.1] - 2024-04-24
 
 ### Fixed
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs 
b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 74c08a7..01c99aa 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -65,6 +65,11 @@ public interface IProducerBuilder<TMessage>
     /// </summary>
     IProducerBuilder<TMessage> MaxPendingMessages(uint maxPendingMessages);
 
+    /// <summary>
+    /// Add/Set a property key/value on the producer. This is optional.
+    /// </summary>
+    IProducerBuilder<TMessage> ProducerProperty(string key, string value);
+
     /// <summary>
     /// Create the producer.
     /// </summary>
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index c5fd667..45db4b5 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -159,7 +159,8 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
         var producerName = _options.ProducerName;
         var schema = _options.Schema;
         var producerAccessMode = (PulsarApi.ProducerAccessMode) 
_options.ProducerAccessMode;
-        var factory = new ProducerChannelFactory(correlationId, 
_processManager, _connectionPool, topic, producerName, producerAccessMode, 
schema.SchemaInfo, _compressorFactory);
+        var producerProperties = _options.ProducerProperties;
+        var factory = new ProducerChannelFactory(correlationId, 
_processManager, _connectionPool, topic, producerName, producerAccessMode, 
schema.SchemaInfo, _compressorFactory, producerProperties);
         var stateManager = CreateStateManager();
         var initialChannel = new NotReadyChannel<TMessage>();
         var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs 
b/src/DotPulsar/Internal/ProducerBuilder.cs
index dbdac4e..2a5f025 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -21,6 +21,7 @@ public sealed class ProducerBuilder<TMessage> : 
IProducerBuilder<TMessage>
 {
     private readonly IPulsarClient _pulsarClient;
     private readonly ISchema<TMessage> _schema;
+    private readonly Dictionary<string, string> _producerProperties;
     private string? _producerName;
     private ProducerAccessMode _producerAccessMode;
     private bool _attachTraceInfoToMessages;
@@ -40,6 +41,7 @@ public sealed class ProducerBuilder<TMessage> : 
IProducerBuilder<TMessage>
         _initialSequenceId = 
ProducerOptions<TMessage>.DefaultInitialSequenceId;
         _maxPendingMessages = 500;
         _producerAccessMode = 
ProducerOptions<TMessage>.DefaultProducerAccessMode;
+        _producerProperties = [];
     }
 
     public IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool 
attachTraceInfoToMessages)
@@ -96,6 +98,12 @@ public sealed class ProducerBuilder<TMessage> : 
IProducerBuilder<TMessage>
         return this;
     }
 
+    public IProducerBuilder<TMessage> ProducerProperty(string key, string 
value)
+    {
+        _producerProperties[key] = value;
+        return this;
+    }
+
     public IProducer<TMessage> Create()
     {
         if (string.IsNullOrEmpty(_topic))
@@ -112,7 +120,8 @@ public sealed class ProducerBuilder<TMessage> : 
IProducerBuilder<TMessage>
             InitialSequenceId = _initialSequenceId,
             ProducerName = _producerName,
             StateChangedHandler = _stateChangedHandler,
-            MaxPendingMessages = _maxPendingMessages
+            MaxPendingMessages = _maxPendingMessages,
+            ProducerProperties = _producerProperties
         };
 
         if (_messageRouter is not null)
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs 
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 7dab27c..ce75cf4 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -36,7 +36,8 @@ public sealed class ProducerChannelFactory : 
IProducerChannelFactory
         string? producerName,
         ProducerAccessMode producerAccessMode,
         SchemaInfo schemaInfo,
-        ICompressorFactory? compressorFactory)
+        ICompressorFactory? compressorFactory,
+        Dictionary<string,string>? properties)
     {
         _correlationId = correlationId;
         _eventRegister = eventRegister;
@@ -49,6 +50,9 @@ public sealed class ProducerChannelFactory : 
IProducerChannelFactory
             Topic = topic
         };
 
+        if (properties is not null)
+            _commandProducer.Metadatas.AddRange(properties.Select(x => new 
KeyValue { Key = x.Key, Value = x.Value }));
+
         _compressorFactory = compressorFactory;
         _schema = schemaInfo.PulsarSchema;
     }
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 127162f..aa6c4d4 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -48,6 +48,7 @@ public sealed class ProducerOptions<TMessage>
         Topic = topic;
         Schema = schema;
         MessageRouter = new RoundRobinPartitionRouter();
+        ProducerProperties = [];
     }
 
     /// <summary>
@@ -99,4 +100,9 @@ public sealed class ProducerOptions<TMessage>
     /// Set the max size of the queue holding the messages pending to receive 
an acknowledgment from the broker.
     /// </summary>
     public uint MaxPendingMessages { get; set; }
+
+    /// <summary>
+    /// Add/Set the producers's properties. This is optional.
+    /// </summary>
+    public Dictionary<string, string> ProducerProperties { get; set; }
 }

Reply via email to