This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2633d59  Add trace linking. Needs testing.
2633d59 is described below

commit 2633d59475ed8d14a1e0d4131a050b89da6ffa90
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Oct 26 22:18:23 2022 +0200

    Add trace linking. Needs testing.
---
 CHANGELOG.md                                      | 10 +++++-
 src/DotPulsar/Abstractions/IProducerBuilder.cs    |  5 +++
 src/DotPulsar/Internal/Constants.cs               |  4 +++
 src/DotPulsar/Internal/DotPulsarActivitySource.cs | 43 +++++++++++++++++------
 src/DotPulsar/Internal/MessageProcessor.cs        | 12 ++++---
 src/DotPulsar/Internal/Producer.cs                |  9 +++++
 src/DotPulsar/Internal/ProducerBuilder.cs         |  9 +++++
 src/DotPulsar/ProcessingOptions.cs                | 23 ++++++++----
 src/DotPulsar/ProducerOptions.cs                  |  6 ++++
 9 files changed, 98 insertions(+), 23 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index f808077..afe645e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,7 +8,15 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/)
 
 ### Added
 
-- The 'Process' extension method for IConsumer\<TMessage\> can now be used for 
parallel processing of messages
+- The 'Process' extension method for IConsumer\<TMessage\> can now be used for 
parallel processing of messages. You can control:
+    - Whether ordered acknowledgment should be enforced
+    - The maximum number of messages that may be processed concurrently
+    - The maximum number of messages that may be processed per task
+    - The TaskScheduler to use for scheduling tasks
+- The trace created when sending a message can now be automatically linked to 
by traces created when consuming the message. All you have to do is:
+    - Make sure 'messaging.trace_parent' and 'messaging.trace_state' are not 
already in use in the message's metadata (properties)
+    - Set 'AttachTraceInfoToMessages' on ProducerOptions or IProducerBuilder 
when creating a producer
+    - Set 'LinkTraces' on the ProcessingOptions passed to the 'Process' 
extension method for IConsumer\<TMessage\>
 
 ## [2.4.1] - 2022-09-16
 
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs 
b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 6c27c80..a858af1 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -19,6 +19,11 @@ namespace DotPulsar.Abstractions;
 /// </summary>
 public interface IProducerBuilder<TMessage>
 {
+    /// <summary>
+    /// Whether to attach the sending trace's parent and state to the outgoing 
messages metadata. The default is 'false'.
+    /// </summary>
+    IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool 
attachTraceInfoToMessages);
+
     /// <summary>
     /// Set the compression type. The default is 'None'.
     /// </summary>
diff --git a/src/DotPulsar/Internal/Constants.cs 
b/src/DotPulsar/Internal/Constants.cs
index 8e1b117..f579818 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -41,6 +41,8 @@ public static class Constants
         MetadataSizeOffset = 6;
         MetadataOffset = 10;
         ConversationId = "messaging.conversation_id";
+        TraceParent = "messaging.trace_parent";
+        TraceState = "messaging.trace_state";
         TimestampToTicks = TimeSpan.TicksPerSecond / (double) 
Stopwatch.Frequency;
     }
 
@@ -55,5 +57,7 @@ public static class Constants
     public static int MetadataSizeOffset { get; }
     public static int MetadataOffset { get; }
     public static string ConversationId { get; }
+    public static string TraceParent { get; }
+    public static string TraceState { get; }
     public static double TimestampToTicks { get; }
 }
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs 
b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 4e24458..114e215 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -29,12 +29,21 @@ public static class DotPulsarActivitySource
 
     public static ActivitySource ActivitySource { get; }
 
-    public static Activity? StartConsumerActivity(IMessage message, string 
operationName, KeyValuePair<string, object?>[] tags)
+    public static Activity? StartConsumerActivity(IMessage message, string 
operationName, KeyValuePair<string, object?>[] tags, bool autoLinkTraces)
     {
         if (!ActivitySource.HasListeners())
             return null;
 
-        return StartActivity(operationName, ActivityKind.Consumer, tags, 
message.GetConversationId());
+        IEnumerable<ActivityLink>? activityLinks = null;
+
+        if (autoLinkTraces)
+        {
+            var activityLink = GetActivityLink(message);
+            if (activityLink is not null)
+                activityLinks = new ActivityLink[] { activityLink.Value };
+        }
+
+        return StartActivity(operationName, ActivityKind.Consumer, tags, 
activityLinks, message.GetConversationId());
     }
 
     public static Activity? StartProducerActivity(MessageMetadata metadata, 
string operationName, KeyValuePair<string, object?>[] tags)
@@ -42,21 +51,33 @@ public static class DotPulsarActivitySource
         if (!ActivitySource.HasListeners())
             return null;
 
-        return StartActivity(operationName, ActivityKind.Producer, tags, 
metadata.GetConversationId());
+        return StartActivity(operationName, ActivityKind.Producer, tags, null, 
metadata.GetConversationId());
+    }
+
+    private static ActivityLink? GetActivityLink(IMessage message)
+    {
+        if (message.Properties.TryGetValue(Constants.TraceParent, out var 
traceParent))
+        {
+            _ = message.Properties.TryGetValue(Constants.TraceState, out var 
traceState);
+
+            if (ActivityContext.TryParse(traceParent, traceState, out var 
context))
+                return new ActivityLink(context);
+        }
+
+        return null;
     }
 
-    private static Activity? StartActivity(string operationName, ActivityKind 
kind, KeyValuePair<string, object?>[] tags, string? conversationId)
+    private static Activity? StartActivity(
+        string operationName,
+        ActivityKind kind,
+        KeyValuePair<string, object?>[] tags,
+        IEnumerable<ActivityLink>? activityLinks,
+        string? conversationId)
     {
-        var activity = ActivitySource.StartActivity(operationName, kind);
+        var activity = ActivitySource.StartActivity(kind, name: operationName, 
tags: tags, links: activityLinks);
 
         if (activity is not null && activity.IsAllDataRequested)
         {
-            for (var i = 0; i < tags.Length; ++i)
-            {
-                var tag = tags[i];
-                activity.SetTag(tag.Key, tag.Value);
-            }
-
             if (conversationId is not null)
                 activity.SetConversationId(conversationId);
         }
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs 
b/src/DotPulsar/Internal/MessageProcessor.cs
index c3355f4..db6ece6 100644
--- a/src/DotPulsar/Internal/MessageProcessor.cs
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -36,7 +36,8 @@ public sealed class MessageProcessor<TMessage> : IDisposable
     private readonly SemaphoreSlim _receiveLock;
     private readonly SemaphoreSlim _acknowledgeLock;
     private readonly ObjectPool<ProcessInfo> _processInfoPool;
-    private readonly bool _ensureOrderedAcknowledgement;
+    private readonly bool _linkTraces;
+    private readonly bool _ensureOrderedAcknowledgment;
     private readonly int _maxDegreeOfParallelism;
     private readonly int _maxMessagesPerTask;
     private readonly TaskScheduler _taskScheduler;
@@ -70,7 +71,8 @@ public sealed class MessageProcessor<TMessage> : IDisposable
         _acknowledgeLock = new SemaphoreSlim(1, 1);
         _processInfoPool = new DefaultObjectPool<ProcessInfo>(new 
DefaultPooledObjectPolicy<ProcessInfo>());
 
-        _ensureOrderedAcknowledgement = options.EnsureOrderedAcknowledgement;
+        _linkTraces = options.LinkTraces;
+        _ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
         _maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
         _maxMessagesPerTask = options.MaxMessagesPerTask;
         _taskScheduler = options.TaskScheduler;
@@ -103,7 +105,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
 
         var processInfo = new ProcessInfo();
 
-        var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgement 
&& _maxDegreeOfParallelism > 1;
+        var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgment 
&& _maxDegreeOfParallelism > 1;
         var isUnbounded = _maxMessagesPerTask == ProcessingOptions.Unbounded;
 
         while (!cancellationToken.IsCancellationRequested)
@@ -124,7 +126,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
                 _receiveLock.Release();
             }
 
-            var activity = 
DotPulsarActivitySource.StartConsumerActivity(message, _operationName, 
_activityTags);
+            var activity = 
DotPulsarActivitySource.StartConsumerActivity(message, _operationName, 
_activityTags, _linkTraces);
             if (activity is not null && activity.IsAllDataRequested)
             {
                 activity.SetMessageId(message.MessageId);
@@ -191,7 +193,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
         var processorTask = Task.Factory.StartNew(
             async () => await 
Processor(cancellationToken).ConfigureAwait(false),
             cancellationToken,
-            TaskCreationOptions.None,
+            TaskCreationOptions.DenyChildAttach,
             _taskScheduler).Unwrap();
 
         _processorTasks.AddLast(processorTask);
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index ba9fc0d..ba6567d 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -32,6 +32,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, 
IRegisterEvent
     private readonly string _operationName;
     private readonly KeyValuePair<string, object?>[] _activityTags;
     private readonly KeyValuePair<string, object?>[] _meterTags;
+    private readonly bool _attachTraceInfoToMessages;
     private readonly SequenceId _sequenceId;
     private readonly StateManager<ProducerState> _state;
     private readonly IConnectionPool _connectionPool;
@@ -70,6 +71,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>, 
IRegisterEvent
         {
                 new KeyValuePair<string, object?>("topic", options.Topic)
         };
+        _attachTraceInfoToMessages = options.AttachTraceInfoToMessages;
         _sequenceId = new SequenceId(options.InitialSequenceId);
         _state = new StateManager<ProducerState>(ProducerState.Disconnected, 
ProducerState.Closed, ProducerState.Faulted);
         ServiceUrl = serviceUrl;
@@ -245,6 +247,13 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
             metadata.SequenceId = _sequenceId.FetchNext();
 
         var activity = DotPulsarActivitySource.StartProducerActivity(metadata, 
_operationName, _activityTags);
+        if (activity is not null && _attachTraceInfoToMessages)
+        {
+            metadata[Constants.TraceParent] = activity.Id;
+            if (activity.TraceStateString is not null)
+                metadata[Constants.TraceState] = activity.TraceStateString;
+        }
+
         var startTimestamp = DotPulsarMeter.MessageSentEnabled ? 
Stopwatch.GetTimestamp() : 0;
 
         try
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs 
b/src/DotPulsar/Internal/ProducerBuilder.cs
index 12859f4..c449319 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -22,6 +22,7 @@ public sealed class ProducerBuilder<TMessage> : 
IProducerBuilder<TMessage>
     private readonly IPulsarClient _pulsarClient;
     private readonly ISchema<TMessage> _schema;
     private string? _producerName;
+    private bool _attachTraceInfoToMessages;
     private CompressionType _compressionType;
     private ulong _initialSequenceId;
     private string? _topic;
@@ -32,10 +33,17 @@ public sealed class ProducerBuilder<TMessage> : 
IProducerBuilder<TMessage>
     {
         _pulsarClient = pulsarClient;
         _schema = schema;
+        _attachTraceInfoToMessages = false;
         _compressionType = ProducerOptions<TMessage>.DefaultCompressionType;
         _initialSequenceId = 
ProducerOptions<TMessage>.DefaultInitialSequenceId;
     }
 
+    public IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool 
attachTraceInfoToMessages)
+    {
+        _attachTraceInfoToMessages = attachTraceInfoToMessages;
+        return this;
+    }
+
     public IProducerBuilder<TMessage> CompressionType(CompressionType 
compressionType)
     {
         _compressionType = compressionType;
@@ -79,6 +87,7 @@ public sealed class ProducerBuilder<TMessage> : 
IProducerBuilder<TMessage>
 
         var options = new ProducerOptions<TMessage>(_topic!, _schema)
         {
+            AttachTraceInfoToMessages = _attachTraceInfoToMessages,
             CompressionType = _compressionType,
             InitialSequenceId = _initialSequenceId,
             ProducerName = _producerName,
diff --git a/src/DotPulsar/ProcessingOptions.cs 
b/src/DotPulsar/ProcessingOptions.cs
index d01a77c..7bcca96 100644
--- a/src/DotPulsar/ProcessingOptions.cs
+++ b/src/DotPulsar/ProcessingOptions.cs
@@ -27,7 +27,8 @@ public sealed class ProcessingOptions
     /// </summary>
     public const int Unbounded = -1;
 
-    private bool _ensureOrderedAcknowledgement;
+    private bool _ensureOrderedAcknowledgment;
+    private bool _linkTraces;
     private int _maxDegreeOfParallelism;
     private int _maxMessagesPerTask;
     private TaskScheduler _taskScheduler;
@@ -37,19 +38,29 @@ public sealed class ProcessingOptions
     /// </summary>
     public ProcessingOptions()
     {
-        _ensureOrderedAcknowledgement = true;
+        _ensureOrderedAcknowledgment = true;
+        _linkTraces = false;
         _maxDegreeOfParallelism = 1;
         _maxMessagesPerTask = Unbounded;
         _taskScheduler = TaskScheduler.Default;
     }
 
     /// <summary>
-    /// Whether ordered acknowledgement should be enforced. The default is 
'true'.
+    /// Whether ordered acknowledgment should be enforced. The default is 
'true'.
     /// </summary>
-    public bool EnsureOrderedAcknowledgement
+    public bool EnsureOrderedAcknowledgment
     {
-        get => _ensureOrderedAcknowledgement;
-        set { _ensureOrderedAcknowledgement = value; }
+        get => _ensureOrderedAcknowledgment;
+        set { _ensureOrderedAcknowledgment = value; }
+    }
+
+    /// <summary>
+    /// Whether to link the process trace to the message's send trace, if 
tracing is enabled. The default is 'false'.
+    /// </summary>
+    public bool LinkTraces
+    {
+        get => _linkTraces;
+        set { _linkTraces = value; }
     }
 
     /// <summary>
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 665fc4d..181eb09 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -36,6 +36,7 @@ public sealed class ProducerOptions<TMessage>
     /// </summary>
     public ProducerOptions(string topic, ISchema<TMessage> schema)
     {
+        AttachTraceInfoToMessages = false;
         CompressionType = DefaultCompressionType;
         InitialSequenceId = DefaultInitialSequenceId;
         Topic = topic;
@@ -43,6 +44,11 @@ public sealed class ProducerOptions<TMessage>
         MessageRouter = new RoundRobinPartitionRouter();
     }
 
+    /// <summary>
+    /// Whether to attach the sending trace's parent and state to the outgoing 
messages metadata. The default is 'false'.
+    /// </summary>
+    public bool AttachTraceInfoToMessages { get; set; }
+
     /// <summary>
     /// Set the compression type. The default is 'None'.
     /// </summary>

Reply via email to